Merge pull request #1005 from akka/wip-2877-revert-patriknw

Revert "Propagate statusPromise down to the actual change in throttler, ...
This commit is contained in:
Patrik Nordwall 2013-01-08 12:02:29 -08:00
commit 67297b516e
16 changed files with 22 additions and 85 deletions

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.{ OnlyCauseStackTrace, AkkaException }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import java.util.concurrent.TimeUnit._

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.remote.FailureDetector.Clock

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import scala.language.postfixOps
@ -403,12 +400,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
val accepting: Receive = {
case ManagementCommand(cmd, statusPromise)
val allStatuses = transportMapping.values map { transport
val p = Promise[Boolean]()
transport.managementCommand(cmd, p)
p.future
}
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)
transportMapping.values foreach { _.managementCommand(cmd, statusPromise) }
case s @ Send(message, senderOption, recipientRef)
val recipientAddress = recipientRef.path.address

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote
import akka.event.{ LoggingAdapter, Logging }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import scala.language.postfixOps

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import akka.AkkaException

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import akka.{ OnlyCauseStackTrace, AkkaException }
@ -378,8 +375,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case Disassociate
stop()
case Heartbeat
failureDetector.heartbeat(); stay()
case Heartbeat failureDetector.heartbeat(); stay()
case Payload(payload) stateData match {
case AssociatedWaitHandler(handlerFuture, wrappedHandle, queue)

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import FailureInjectorTransportAdapter._

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import TestTransport._

View file

@ -1,12 +1,8 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import ThrottlerTransportAdapter._
import akka.actor._
import akka.pattern.pipe
import akka.remote.EndpointManager.ManagementCommand
import akka.remote.transport.ActorTransportAdapter.AssociateUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenUnderlying
import akka.remote.transport.ActorTransportAdapter.ListenerRegistered
@ -21,7 +17,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
import scala.collection.immutable.Queue
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.math.min
import scala.util.{ Success, Failure }
@ -112,8 +107,10 @@ class ThrottlerTransportAdapter(_wrappedTransport: Transport, _system: ExtendedA
protected def managerProps = Props(new ThrottlerManager(wrappedTransport))
override def managementCommand(cmd: Any, statusPromise: Promise[Boolean]): Unit = cmd match {
case s: SetThrottle manager ! ManagementCommand(s, statusPromise)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
case s @ SetThrottle(_, _, _)
manager ! s
statusPromise.success(true)
case _ wrappedTransport.managementCommand(cmd, statusPromise)
}
}
@ -167,22 +164,17 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
wrappedHandle.readHandlerPromise.future.map { (_, inMode) } pipeTo wrappedHandle.throttlerActor
handleTable ::= nakedAddress(naked) -> wrappedHandle
statusPromise.success(wrappedHandle)
case ManagementCommand(SetThrottle(address, direction, mode), statusPromise)
case s @ SetThrottle(address, direction, mode)
val naked = nakedAddress(address)
throttlingModes += naked -> (mode, direction)
val allStatuses = handleTable.map {
case (`naked`, handle)
val p = Promise[Boolean]()
setMode(handle, mode, direction, p)
p.future
case _ Future.successful(true)
handleTable.foreach {
case (addr, handle)
if (addr == naked) setMode(handle, mode, direction)
}
statusPromise completeWith Future.fold(allStatuses)(true)(_ && _)
case Checkin(origin, handle)
val naked: Address = nakedAddress(origin)
handleTable ::= naked -> handle
setMode(naked, handle, Promise[Boolean]())
setMode(naked, handle)
}
@ -200,19 +192,16 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
}
}
private def setMode(nakedAddress: Address, handle: ThrottlerHandle, statusPromise: Promise[Boolean]): Unit = {
private def setMode(nakedAddress: Address, handle: ThrottlerHandle): Unit = {
throttlingModes.get(nakedAddress) match {
case Some((mode, direction)) setMode(handle, mode, direction, statusPromise)
case None setMode(handle, Unthrottled, Direction.Both, statusPromise)
case Some((mode, direction)) setMode(handle, mode, direction)
case None setMode(handle, Unthrottled, Direction.Both)
}
}
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction,
statusPromise: Promise[Boolean]): Unit = {
if (direction.includes(Direction.Send))
handle.outboundThrottleMode.set(mode)
if (direction.includes(Direction.Receive))
handle.throttlerActor ! ManagementCommand(mode, statusPromise)
private def setMode(handle: ThrottlerHandle, mode: ThrottleMode, direction: Direction): Unit = {
if (direction.includes(Direction.Receive)) handle.throttlerActor ! mode
if (direction.includes(Direction.Send)) handle.outboundThrottleMode.set(mode)
}
private def wrapHandle(originalHandle: AssociationHandle, listener: AssociationEventListener, inbound: Boolean): ThrottlerHandle = {
@ -297,9 +286,9 @@ private[transport] class ThrottledAssociation(
case Event(InboundPayload(p), _)
throttledMessages = throttledMessages enqueue p
stay()
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), ExposedHandle(exposedHandle))
case Event(mode: ThrottleMode, ExposedHandle(exposedHandle))
inboundThrottleMode = mode
try if (mode == Blackhole) {
if (inboundThrottleMode == Blackhole) {
throttledMessages = Queue.empty[ByteString]
exposedHandle.disassociate()
stop()
@ -307,7 +296,7 @@ private[transport] class ThrottledAssociation(
associationHandler notify InboundAssociation(exposedHandle)
exposedHandle.readHandlerPromise.future pipeTo self
goto(WaitUpstreamListener)
} finally statusPromise.success(true)
}
}
when(WaitUpstreamListener) {
@ -332,13 +321,12 @@ private[transport] class ThrottledAssociation(
}
when(Throttling) {
case Event(ManagementCommand(mode: ThrottleMode, statusPromise), _)
case Event(mode: ThrottleMode, _)
inboundThrottleMode = mode
if (mode == Blackhole) throttledMessages = Queue.empty[ByteString]
if (inboundThrottleMode == Blackhole) throttledMessages = Queue.empty[ByteString]
cancelTimer(DequeueTimerName)
if (throttledMessages.nonEmpty)
scheduleDequeue(inboundThrottleMode.timeToAvailable(System.nanoTime(), throttledMessages.head.length))
statusPromise.success(true)
stay()
case Event(InboundPayload(p), _)
forwardOrDelay(p)

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport
import scala.concurrent.{ Promise, Future }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.AkkaException

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.{ OnlyCauseStackTrace, ConfigurationException }

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.actor.Address

View file

@ -1,6 +1,3 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.remote.transport.netty
import akka.actor.Address