Merge pull request #1033 from akka/wip-def-if-patriknw

Style change of def starting with if
This commit is contained in:
Patrik Nordwall 2013-01-18 04:30:43 -08:00
commit db5da848c0
25 changed files with 268 additions and 241 deletions

View file

@ -80,21 +80,21 @@ object ActorModelSpec {
}
def receive = {
case AwaitLatch(latch) ack; latch.await(); busy.switchOff()
case Meet(sign, wait) ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) ack; Thread.sleep(time); l.countDown(); busy.switchOff()
case Reply(msg) ack; sender ! msg; busy.switchOff()
case TryReply(msg) ack; sender.tell(msg, null); busy.switchOff()
case Forward(to, msg) ack; to.forward(msg); busy.switchOff()
case CountDown(latch) ack; latch.countDown(); busy.switchOff()
case Increment(count) ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l) ack; l.countDown(); context.stop(self); busy.switchOff()
case Restart ack; busy.switchOff(); throw new Exception("Restart requested")
case Interrupt ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!")
case InterruptNicely(msg) ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt()
case ThrowException(e: Throwable) ack; busy.switchOff(); throw e
case DoubleStop ack; context.stop(self); context.stop(self); busy.switchOff
case AwaitLatch(latch) { ack; latch.await(); busy.switchOff() }
case Meet(sign, wait) { ack; sign.countDown(); wait.await(); busy.switchOff() }
case Wait(time) { ack; Thread.sleep(time); busy.switchOff() }
case WaitAck(time, l) { ack; Thread.sleep(time); l.countDown(); busy.switchOff() }
case Reply(msg) { ack; sender ! msg; busy.switchOff() }
case TryReply(msg) { ack; sender.tell(msg, null); busy.switchOff() }
case Forward(to, msg) { ack; to.forward(msg); busy.switchOff() }
case CountDown(latch) { ack; latch.countDown(); busy.switchOff() }
case Increment(count) { ack; count.incrementAndGet(); busy.switchOff() }
case CountDownNStop(l) { ack; l.countDown(); context.stop(self); busy.switchOff() }
case Restart { ack; busy.switchOff(); throw new Exception("Restart requested") }
case Interrupt { ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") }
case InterruptNicely(msg) { ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt() }
case ThrowException(e: Throwable) { ack; busy.switchOff(); throw e }
case DoubleStop { ack; context.stop(self); context.stop(self); busy.switchOff }
}
}
@ -229,16 +229,17 @@ object ActorModelSpec {
}
}
@tailrec def await(until: Long)(condition: Boolean): Unit = if (System.currentTimeMillis() <= until) {
var done = false
try {
done = condition
if (!done) Thread.sleep(25)
} catch {
case e: InterruptedException
}
if (!done) await(until)(condition)
} else throw new AssertionError("await failed")
@tailrec def await(until: Long)(condition: Boolean): Unit =
if (System.currentTimeMillis() <= until) {
var done = false
try {
done = condition
if (!done) Thread.sleep(25)
} catch {
case e: InterruptedException
}
if (!done) await(until)(condition)
} else throw new AssertionError("await failed")
}
abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout {

View file

@ -402,23 +402,24 @@ private[akka] class ActorCell(
checkReceiveTimeout // Reschedule receive timeout
}
def autoReceiveMessage(msg: Envelope): Unit = if (msg.message != NullMessage) {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
def autoReceiveMessage(msg: Envelope): Unit =
if (msg.message != NullMessage) {
if (system.settings.DebugAutoReceive)
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated
if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor)
watchedActorTerminated(t)
case AddressTerminated(address) addressTerminated(address)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated
if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor)
watchedActorTerminated(t)
case AddressTerminated(address) addressTerminated(address)
case Kill throw new ActorKilledException("Kill")
case PoisonPill self.stop()
case SelectParent(m) parent.tell(m, msg.sender)
case SelectChildName(name, m) getChildByName(name) match { case Some(c: ChildRestartStats) c.child.tell(m, msg.sender); case _ }
case SelectChildPattern(p, m) for (c children if p.matcher(c.path.name).matches) c.tell(m, msg.sender)
}
}
}
/**
* When a parent is watching a child and it terminates due to AddressTerminated,
@ -507,16 +508,17 @@ private[akka] class ActorCell(
}
}
private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) {
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
initChild(child) match {
case Some(crs)
crs.uid = uid
handleSupervise(child, async)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit =
if (!isTerminating) {
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
initChild(child) match {
case Some(crs)
crs.uid = uid
handleSupervise(child, async)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))
}
}
}
// future extension point
protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match {

View file

@ -382,7 +382,7 @@ class LocalActorRefProvider(
override def isTerminated: Boolean = stopped.isOn
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match {
case Failed(ex, _) if sender ne null causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop()
case Failed(ex, _) if sender ne null { causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() }
case NullMessage // do nothing
case _ log.error(this + " received unexpected message [" + message + "]")
})
@ -449,10 +449,11 @@ class LocalActorRefProvider(
stopWhenAllTerminationHooksDone()
}
def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) {
eventStream.stopDefaultLoggers()
context.stop(self)
}
def stopWhenAllTerminationHooksDone(): Unit =
if (terminationHooks.isEmpty) {
eventStream.stopDefaultLoggers()
context.stop(self)
}
// guardian MUST NOT lose its children during restart
override def preRestart(cause: Throwable, msg: Option[Any]) {}

View file

@ -97,10 +97,11 @@ object FSM {
if (repeat) scheduler.schedule(timeout, timeout, actor, this)
else scheduler.scheduleOnce(timeout, actor, this))
def cancel(): Unit = if (ref.isDefined) {
ref.get.cancel()
ref = None
}
def cancel(): Unit =
if (ref.isDefined) {
ref.get.cancel()
ref = None
}
}
/**

View file

@ -40,12 +40,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
* When this actor is watching the subject of [[akka.actor.Terminated]] message
* it will be propagated to user's receive.
*/
protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) {
maintainAddressTerminatedSubscription(t.actor) {
watching -= t.actor
protected def watchedActorTerminated(t: Terminated): Unit =
if (watching.contains(t.actor)) {
maintainAddressTerminatedSubscription(t.actor) {
watching -= t.actor
}
receiveMessage(t)
}
receiveMessage(t)
}
protected def tellWatchersWeDied(actor: Actor): Unit = {
if (!watchedBy.isEmpty) {

View file

@ -150,23 +150,24 @@ private[akka] object MessageDispatcher {
// since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
final val debug = false // Deliberately without type ascription to make it a compile-time constant
lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _)
def printActors: Unit = if (debug) {
for {
d actors.keys
a { println(d + " inhabitants: " + d.inhabitants); actors.valueIterator(d) }
} {
val status = if (a.isTerminated) " (terminated)" else " (alive)"
val messages = a match {
case r: ActorRefWithCell " " + r.underlying.numberOfMessages + " messages"
case _ " " + a.getClass
def printActors: Unit =
if (debug) {
for {
d actors.keys
a { println(d + " inhabitants: " + d.inhabitants); actors.valueIterator(d) }
} {
val status = if (a.isTerminated) " (terminated)" else " (alive)"
val messages = a match {
case r: ActorRefWithCell " " + r.underlying.numberOfMessages + " messages"
case _ " " + a.getClass
}
val parent = a match {
case i: InternalActorRef ", parent: " + i.getParent
case _ ""
}
println(" -> " + a + status + messages + parent)
}
val parent = a match {
case i: InternalActorRef ", parent: " + i.getParent
case _ ""
}
println(" -> " + a + status + messages + parent)
}
}
implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher
}

View file

@ -85,17 +85,18 @@ class BalancingDispatcher(
if (!registerForExecution(receiver.mailbox, false, false)) teamWork()
}
protected def teamWork(): Unit = if (attemptTeamWork) {
@tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit =
if (messageQueue.hasMessages
&& i.hasNext
&& (executorService.executor match {
case lm: LoadMetrics lm.atFullThrottle == false
case other true
})
&& !registerForExecution(i.next.mailbox, false, false))
scheduleOne(i)
protected def teamWork(): Unit =
if (attemptTeamWork) {
@tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit =
if (messageQueue.hasMessages
&& i.hasNext
&& (executorService.executor match {
case lm: LoadMetrics lm.atFullThrottle == false
case other true
})
&& !registerForExecution(i.next.mailbox, false, false))
scheduleOne(i)
scheduleOne()
}
scheduleOne()
}
}

View file

@ -285,12 +285,13 @@ class BoundedBlockingQueue[E <: AnyRef](
last = -1 //To avoid 2 subsequent removes without a next in between
lock.lock()
try {
@tailrec def removeTarget(i: Iterator[E] = backing.iterator()): Unit = if (i.hasNext) {
if (i.next eq target) {
i.remove()
notFull.signal()
} else removeTarget(i)
}
@tailrec def removeTarget(i: Iterator[E] = backing.iterator()): Unit =
if (i.hasNext) {
if (i.next eq target) {
i.remove()
notFull.signal()
} else removeTarget(i)
}
removeTarget()
} finally {

View file

@ -252,14 +252,15 @@ object ByteIterator {
normalize()
}
@tailrec final override def drop(n: Int): this.type = if ((n > 0) && !isEmpty) {
val nCurrent = math.min(n, current.len)
current.drop(n)
val rest = n - nCurrent
assert(current.isEmpty || (rest == 0))
normalize()
drop(rest)
} else this
@tailrec final override def drop(n: Int): this.type =
if ((n > 0) && !isEmpty) {
val nCurrent = math.min(n, current.len)
current.drop(n)
val rest = n - nCurrent
assert(current.isEmpty || (rest == 0))
normalize()
drop(rest)
} else this
final override def takeWhile(p: Byte Boolean): this.type = {
var stop = false
@ -275,12 +276,13 @@ object ByteIterator {
normalize()
}
@tailrec final override def dropWhile(p: Byte Boolean): this.type = if (!isEmpty) {
current.dropWhile(p)
val dropMore = current.isEmpty
normalize()
if (dropMore) dropWhile(p) else this
} else this
@tailrec final override def dropWhile(p: Byte Boolean): this.type =
if (!isEmpty) {
current.dropWhile(p)
val dropMore = current.isEmpty
normalize()
if (dropMore) dropWhile(p) else this
} else this
final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = {
var pos = start
@ -309,19 +311,20 @@ object ByteIterator {
}
}
@tailrec protected final def getToArray[A](xs: Array[A], offset: Int, n: Int, elemSize: Int)(getSingle: A)(getMult: (Array[A], Int, Int) Unit): this.type = if (n <= 0) this else {
if (isEmpty) Iterator.empty.next
val nDone = if (current.len >= elemSize) {
val nCurrent = math.min(n, current.len / elemSize)
getMult(xs, offset, nCurrent)
nCurrent
} else {
xs(offset) = getSingle
1
@tailrec protected final def getToArray[A](xs: Array[A], offset: Int, n: Int, elemSize: Int)(getSingle: A)(getMult: (Array[A], Int, Int) Unit): this.type =
if (n <= 0) this else {
if (isEmpty) Iterator.empty.next
val nDone = if (current.len >= elemSize) {
val nCurrent = math.min(n, current.len / elemSize)
getMult(xs, offset, nCurrent)
nCurrent
} else {
xs(offset) = getSingle
1
}
normalize()
getToArray(xs, offset + nDone, n - nDone, elemSize)(getSingle)(getMult)
}
normalize()
getToArray(xs, offset + nDone, n - nDone, elemSize)(getSingle)(getMult)
}
def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type =
getToArray(xs, offset, n, 1) { getByte } { current.getBytes(_, _, _) }
@ -359,16 +362,17 @@ object ByteIterator {
}
override def skip(n: Long): Long = {
@tailrec def skipImpl(n: Long, skipped: Long): Long = if (n > 0) {
if (!isEmpty) {
val m = current.asInputStream.skip(n)
normalize()
val newN = n - m
val newSkipped = skipped + m
if (newN > 0) skipImpl(newN, newSkipped)
else newSkipped
@tailrec def skipImpl(n: Long, skipped: Long): Long =
if (n > 0) {
if (!isEmpty) {
val m = current.asInputStream.skip(n)
normalize()
val newN = n - m
val newSkipped = skipped + m
if (newN > 0) skipImpl(newN, newSkipped)
else newSkipped
} else 0
} else 0
} else 0
skipImpl(n, 0)
}

View file

@ -38,12 +38,13 @@ private[akka] object Collections {
} else hasNext //Attempt to find the next
} else _hasNext // Return if we found one
override final def next(): To = if (hasNext) {
val ret = _next
_next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value)
_hasNext = false // Mark as consumed (we need to look for the next value)
ret
} else throw new java.util.NoSuchElementException("next")
override final def next(): To =
if (hasNext) {
val ret = _next
_next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value)
_hasNext = false // Mark as consumed (we need to look for the next value)
ret
} else throw new java.util.NoSuchElementException("next")
}
}

View file

@ -803,8 +803,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto
def oneWayGossipTo(address: Address): Unit =
gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false))
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress)
coreSender ! SendClusterMessage(address, gossipMsg)
def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit =
if (address != selfAddress)
coreSender ! SendClusterMessage(address, gossipMsg)
def publish(newGossip: Gossip): Unit = {
publisher ! PublishChanges(newGossip)

View file

@ -143,17 +143,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg
case ExpectedFirstHeartbeat(from) triggerFirstHeartbeat(from)
}
def reset(snapshot: InstantClusterState): Unit =
state = state.reset(snapshot.members.map(_.address))
def reset(snapshot: InstantClusterState): Unit = state = state.reset(snapshot.members.map(_.address))
def addMember(m: Member): Unit = if (m.address != selfAddress)
state = state addMember m.address
def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address
def removeMember(m: Member): Unit = if (m.address != selfAddress)
state = state removeMember m.address
def removeMember(m: Member): Unit = if (m.address != selfAddress) state = state removeMember m.address
def addHeartbeatRequest(address: Address): Unit = if (address != selfAddress)
state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive)
def addHeartbeatRequest(address: Address): Unit =
if (address != selfAddress) state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive)
def sendHeartbeatRequest(address: Address): Unit =
if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) {

View file

@ -307,12 +307,13 @@ case class Metric private (name: String, value: Number, private val average: Opt
* If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new
* data point, and if defined, updates the data stream. Returns the updated metric.
*/
def :+(latest: Metric): Metric = if (this sameAs latest) average match {
case Some(avg) copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue))
case None if latest.average.isDefined copy(value = latest.value, average = latest.average)
case _ copy(value = latest.value)
}
else this
def :+(latest: Metric): Metric =
if (this sameAs latest) average match {
case Some(avg) copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue))
case None if latest.average.isDefined copy(value = latest.value, average = latest.average)
case _ copy(value = latest.value)
}
else this
/**
* The numerical value of the average, if defined, otherwise the latest value

View file

@ -143,8 +143,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
/**
* Unsubscribe to cluster events.
*/
def close(): Unit = if (!eventBusListener.isTerminated) {
eventBusListener ! PoisonPill
}
def close(): Unit =
if (!eventBusListener.isTerminated)
eventBusListener ! PoisonPill
}

View file

@ -81,13 +81,13 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
}
}
def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) {
sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))
}
def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*")))
def muteDeadLetters(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) {
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*")))
}
def muteDeadLetters(sys: ActorSystem = system): Unit =
if (!sys.log.isDebugEnabled)
sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*")))
override def afterAll(): Unit = {
if (!log.isDebugEnabled) {
@ -201,15 +201,16 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS
* out of all nodes in the cluster. First
* member in the cluster ring is expected leader.
*/
def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) {
nodesInCluster.length must not be (0)
val expectedLeader = roleOfLeader(nodesInCluster)
val leader = clusterView.leader
val isLeader = leader == Some(clusterView.selfAddress)
assert(isLeader == isNode(expectedLeader),
"expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving))
}
def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit =
if (nodesInCluster.contains(myself)) {
nodesInCluster.length must not be (0)
val expectedLeader = roleOfLeader(nodesInCluster)
val leader = clusterView.leader
val isLeader = leader == Some(clusterView.selfAddress)
assert(isLeader == isNode(expectedLeader),
"expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members))
clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving))
}
/**
* Wait until the expected number of members has status Up and convergence has been reached.

View file

@ -435,11 +435,12 @@ object StressMultiJvmSpec extends MultiNodeConfig {
case RetryTick resend()
}
def done(replyTo: ActorRef): Unit = if (outstanding.isEmpty) {
val duration = (System.nanoTime - startTime).nanos
replyTo ! WorkResult(duration, sendCounter, ackCounter)
context stop self
}
def done(replyTo: ActorRef): Unit =
if (outstanding.isEmpty) {
val duration = (System.nanoTime - startTime).nanos
replyTo ! WorkResult(duration, sendCounter, ackCounter)
context stop self
}
def sendJobs(): Unit = {
0 until settings.workBatchSize foreach { _

View file

@ -125,9 +125,10 @@ trait Conductor { this: TestConductorExt ⇒
def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] =
throttle(node, target, direction, 0f)
private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
"(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.")
private def requireTestConductorTranport(): Unit =
if (!transport.defaultAddress.protocol.contains(".gremlin.trttl."))
throw new ConfigurationException("To use this feature you must activate the failure injector adapters " +
"(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.")
/**
* Switch the Netty pipeline of the remote support into pass through mode for
@ -377,7 +378,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP
case BarrierTimeout(data) failBarrier(data)
case FailedBarrier(data) failBarrier(data)
case BarrierEmpty(data, msg) SupervisorStrategy.Resume
case WrongBarrier(name, client, data) client ! ToClient(BarrierResult(name, false)); failBarrier(data)
case WrongBarrier(name, client, data) { client ! ToClient(BarrierResult(name, false)); failBarrier(data) }
case ClientLost(data, node) failBarrier(data)
case DuplicateNode(data, node) failBarrier(data)
}

View file

@ -100,10 +100,11 @@ class LogRoleReplace {
def process(in: BufferedReader, out: PrintWriter): Unit = {
@tailrec
def processLines(line: String): Unit = if (line ne null) {
out.println(processLine(line))
processLines(in.readLine)
}
def processLines(line: String): Unit =
if (line ne null) {
out.println(processLine(line))
processLines(in.readLine)
}
processLines(in.readLine())
}

View file

@ -298,17 +298,18 @@ private[remote] object EndpointManager {
endpoint
}
def unregisterEndpoint(endpoint: ActorRef): Unit = if (isWritable(endpoint)) {
val address = writableToAddress(endpoint)
addressToWritable.get(address) match {
case Some(policy) if policy.isTombstone // There is already a tombstone directive, leave it there
case _ addressToWritable -= address
def unregisterEndpoint(endpoint: ActorRef): Unit =
if (isWritable(endpoint)) {
val address = writableToAddress(endpoint)
addressToWritable.get(address) match {
case Some(policy) if policy.isTombstone // There is already a tombstone directive, leave it there
case _ addressToWritable -= address
}
writableToAddress -= endpoint
} else if (isReadOnly(endpoint)) {
addressToReadonly -= readonlyToAddress(endpoint)
readonlyToAddress -= endpoint
}
writableToAddress -= endpoint
} else if (isReadOnly(endpoint)) {
addressToReadonly -= readonlyToAddress(endpoint)
readonlyToAddress -= endpoint
}
def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address)
@ -328,13 +329,14 @@ private[remote] object EndpointManager {
case _ false
}
def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit = if (isWritable(endpoint)) {
addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease)
writableToAddress -= endpoint
} else if (isReadOnly(endpoint)) {
addressToReadonly -= readonlyToAddress(endpoint)
readonlyToAddress -= endpoint
}
def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit =
if (isWritable(endpoint)) {
addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease)
writableToAddress -= endpoint
} else if (isReadOnly(endpoint)) {
addressToReadonly -= readonlyToAddress(endpoint)
readonlyToAddress -= endpoint
}
def markAsQuarantined(address: Address, reason: Throwable): Unit = addressToWritable += address -> Quarantined(reason)

View file

@ -48,9 +48,10 @@ trait SchemeAugmenter {
protected def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol))
protected def removeScheme(scheme: String): String = if (scheme.endsWith(s".$addedSchemeIdentifier"))
scheme.take(scheme.length - addedSchemeIdentifier.length - 1)
else scheme
protected def removeScheme(scheme: String): String =
if (scheme.endsWith(s".$addedSchemeIdentifier"))
scheme.take(scheme.length - addedSchemeIdentifier.length - 1)
else scheme
protected def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol))
}

View file

@ -135,7 +135,8 @@ private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHand
override def disassociate(): Unit = wrappedHandle.disassociate()
override def notify(ev: HandleEvent): Unit = if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress))
upstreamListener notify ev
override def notify(ev: HandleEvent): Unit =
if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress))
upstreamListener notify ev
}

View file

@ -70,9 +70,10 @@ object ThrottlerTransportAdapter {
case class TokenBucket(capacity: Int, tokensPerSecond: Double, nanoTimeOfLastSend: Long, availableTokens: Int)
extends ThrottleMode {
private def isAvailable(nanoTimeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) {
true // Allow messages larger than capacity through, it will be recorded as negative tokens
} else min((availableTokens + tokensGenerated(nanoTimeOfSend)), capacity) >= tokens
private def isAvailable(nanoTimeOfSend: Long, tokens: Int): Boolean =
if ((tokens > capacity && availableTokens > 0)) {
true // Allow messages larger than capacity through, it will be recorded as negative tokens
} else min((availableTokens + tokensGenerated(nanoTimeOfSend)), capacity) >= tokens
override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = {
if (isAvailable(nanoTimeOfSend, tokens))

View file

@ -68,10 +68,11 @@ private[remote] class TcpAssociationHandle(val localAddress: Address, val remote
override val readHandlerPromise: Promise[HandleEventListener] = Promise()
override def write(payload: ByteString): Boolean = if (channel.isWritable && channel.isOpen) {
channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer))
true
} else false
override def write(payload: ByteString): Boolean =
if (channel.isWritable && channel.isOpen) {
channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer))
true
} else false
override def disassociate(): Unit = NettyTransport.gracefulClose(channel)
}

View file

@ -90,32 +90,35 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re
(new TestFailureDetector, registry, transport, handle)
}
def lastActivityIsHeartbeat(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match {
case Heartbeat true
case _ false
}
case _ false
}
def lastActivityIsHeartbeat(registry: AssociationRegistry) =
if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match {
case Heartbeat true
case _ false
}
case _ false
}
def lastActivityIsAssociate(registry: AssociationRegistry, cookie: Option[String]) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match {
case Associate(c, origin) if c == cookie && origin == localAddress true
case _ false
}
case _ false
}
def lastActivityIsAssociate(registry: AssociationRegistry, cookie: Option[String]) =
if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match {
case Associate(c, origin) if c == cookie && origin == localAddress true
case _ false
}
case _ false
}
def lastActivityIsDisassociate(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match {
case Disassociate true
case _ false
}
case _ false
}
def lastActivityIsDisassociate(registry: AssociationRegistry) =
if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match {
case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress
codec.decodePdu(payload) match {
case Disassociate true
case _ false
}
case _ false
}
"ProtocolStateActor" must {

View file

@ -25,10 +25,11 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false)
val nonExistingAddress = Address("test." + schemeIdentifier, "nosystem", "nohost", 0)
def freshTransport(testTransport: TestTransport): Transport
def wrapTransport(transport: Transport): Transport = if (withAkkaProtocol) {
val provider = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider]
new AkkaProtocolTransport(transport, system, new AkkaProtocolSettings(provider.remoteSettings.config), AkkaPduProtobufCodec)
} else transport
def wrapTransport(transport: Transport): Transport =
if (withAkkaProtocol) {
val provider = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider]
new AkkaProtocolTransport(transport, system, new AkkaProtocolSettings(provider.remoteSettings.config), AkkaPduProtobufCodec)
} else transport
def newTransportA(registry: AssociationRegistry): Transport =
wrapTransport(freshTransport(new TestTransport(addressATest, registry)))