Fixes by Roland:

- exhaustive matches in AkkaProtocolTransport
 - handling of Terminated in flushing state in Remoting
 - various fixes of warnings
This commit is contained in:
Endre Sándor Varga 2012-12-19 15:12:29 +01:00
parent 4dcb38c758
commit 37877fa3ed
5 changed files with 12 additions and 15 deletions

View file

@ -169,7 +169,7 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
stopped map { _ () } // RARP needs only type Unit, not a boolean stopped map { _ () } // RARP needs only type Unit, not a boolean
case None case None
log.warning("Remoting is not running. Ignoring shutdown attempt.") log.warning("Remoting is not running. Ignoring shutdown attempt.")
Future successful () Future successful (())
} }
} }
@ -463,6 +463,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
def flushing: Receive = { def flushing: Receive = {
case s: Send forwardToDeadLetters(s) case s: Send forwardToDeadLetters(s)
case InboundAssociation(h) h.disassociate() case InboundAssociation(h) h.disassociate()
case Terminated(_) // why should we care now?
} }
private def forwardToDeadLetters(s: Send): Unit = { private def forwardToDeadLetters(s: Send): Unit = {
@ -551,4 +552,4 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
} }
} }
} }

View file

@ -211,7 +211,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
} }
} }
} }
Future successful () Future successful (())
} }
def send( def send(
@ -271,10 +271,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
def shutdownClientConnection(remoteAddress: Address): Unit = { def shutdownClientConnection(remoteAddress: Address): Unit = {
clientsLock.writeLock().lock() clientsLock.writeLock().lock()
try { try {
remoteClients.remove(remoteAddress) match { remoteClients.remove(remoteAddress) foreach (_.shutdown())
case Some(client) client.shutdown()
case None false
}
} finally { } finally {
clientsLock.writeLock().unlock() clientsLock.writeLock().unlock()
} }
@ -283,10 +280,7 @@ private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider
def restartClientConnection(remoteAddress: Address): Unit = { def restartClientConnection(remoteAddress: Address): Unit = {
clientsLock.readLock().lock() clientsLock.readLock().lock()
try { try {
remoteClients.get(remoteAddress) match { remoteClients.get(remoteAddress) foreach (_.connect(reconnectIfAlreadyConnected = true))
case Some(client) client.connect(reconnectIfAlreadyConnected = true)
case None false
}
} finally { } finally {
clientsLock.readLock().unlock() clientsLock.readLock().unlock()
} }

View file

@ -386,6 +386,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
case ListenerReady(listener, _) case ListenerReady(listener, _)
listener notify InboundPayload(payload) listener notify InboundPayload(payload)
stay() stay()
case msg throw new AkkaProtocolException("unhandled message in Open state: " + msg, null)
} }
case _ stay() case _ stay()
@ -398,6 +399,7 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat
val handle = stateData match { val handle = stateData match {
case ListenerReady(_, wrappedHandle) wrappedHandle case ListenerReady(_, wrappedHandle) wrappedHandle
case AssociatedWaitHandler(_, wrappedHandle, _) wrappedHandle case AssociatedWaitHandler(_, wrappedHandle, _) wrappedHandle
case msg throw new AkkaProtocolException("unhandled message in Open state: " + msg, null)
} }
sendDisassociate(handle) sendDisassociate(handle)
stop() stop()

View file

@ -155,7 +155,7 @@ class TestTransport(
}) })
private[akka] def write(handle: TestAssociationHandle, payload: ByteString): Boolean = private[akka] def write(handle: TestAssociationHandle, payload: ByteString): Boolean =
Await.result(writeBehavior((handle, payload)), 3 seconds) Await.result(writeBehavior((handle, payload)), 3.seconds)
private[akka] def disassociate(handle: TestAssociationHandle): Unit = disassociateBehavior(handle) private[akka] def disassociate(handle: TestAssociationHandle): Unit = disassociateBehavior(handle)

View file

@ -333,7 +333,7 @@ private[transport] class ThrottledAssociation(
if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue if (inboundThrottleMode == Unthrottled && !queue.isEmpty) self ! Dequeue
else if (!queue.isEmpty) { else if (!queue.isEmpty) {
context.system.scheduler.scheduleOnce( context.system.scheduler.scheduleOnce(
inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length) nanoseconds, self, Dequeue) inboundThrottleMode.timeToAvailable(System.nanoTime(), queue.head.length).nanos, self, Dequeue)
} }
} }
stay() stay()
@ -376,7 +376,7 @@ private[transport] class ThrottledAssociation(
queue = queue.enqueue(payload) queue = queue.enqueue(payload)
context.system.scheduler.scheduleOnce( context.system.scheduler.scheduleOnce(
inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens) nanoseconds, self, Dequeue) inboundThrottleMode.timeToAvailable(System.nanoTime(), tokens).nanos, self, Dequeue)
} }
} else { } else {
queue = queue.enqueue(payload) queue = queue.enqueue(payload)
@ -419,4 +419,4 @@ private[transport] case class ThrottlerHandle(_wrappedHandle: AssociationHandle,
throttlerActor ! PoisonPill throttlerActor ! PoisonPill
} }
} }