diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 3fa308ca3e..0073857809 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -80,21 +80,21 @@ object ActorModelSpec { } def receive = { - case AwaitLatch(latch) ⇒ ack; latch.await(); busy.switchOff() - case Meet(sign, wait) ⇒ ack; sign.countDown(); wait.await(); busy.switchOff() - case Wait(time) ⇒ ack; Thread.sleep(time); busy.switchOff() - case WaitAck(time, l) ⇒ ack; Thread.sleep(time); l.countDown(); busy.switchOff() - case Reply(msg) ⇒ ack; sender ! msg; busy.switchOff() - case TryReply(msg) ⇒ ack; sender.tell(msg, null); busy.switchOff() - case Forward(to, msg) ⇒ ack; to.forward(msg); busy.switchOff() - case CountDown(latch) ⇒ ack; latch.countDown(); busy.switchOff() - case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff() - case CountDownNStop(l) ⇒ ack; l.countDown(); context.stop(self); busy.switchOff() - case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested") - case Interrupt ⇒ ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") - case InterruptNicely(msg) ⇒ ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt() - case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e - case DoubleStop ⇒ ack; context.stop(self); context.stop(self); busy.switchOff + case AwaitLatch(latch) ⇒ { ack; latch.await(); busy.switchOff() } + case Meet(sign, wait) ⇒ { ack; sign.countDown(); wait.await(); busy.switchOff() } + case Wait(time) ⇒ { ack; Thread.sleep(time); busy.switchOff() } + case WaitAck(time, l) ⇒ { ack; Thread.sleep(time); l.countDown(); busy.switchOff() } + case Reply(msg) ⇒ { ack; sender ! msg; busy.switchOff() } + case TryReply(msg) ⇒ { ack; sender.tell(msg, null); busy.switchOff() } + case Forward(to, msg) ⇒ { ack; to.forward(msg); busy.switchOff() } + case CountDown(latch) ⇒ { ack; latch.countDown(); busy.switchOff() } + case Increment(count) ⇒ { ack; count.incrementAndGet(); busy.switchOff() } + case CountDownNStop(l) ⇒ { ack; l.countDown(); context.stop(self); busy.switchOff() } + case Restart ⇒ { ack; busy.switchOff(); throw new Exception("Restart requested") } + case Interrupt ⇒ { ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!") } + case InterruptNicely(msg) ⇒ { ack; sender ! msg; busy.switchOff(); Thread.currentThread().interrupt() } + case ThrowException(e: Throwable) ⇒ { ack; busy.switchOff(); throw e } + case DoubleStop ⇒ { ack; context.stop(self); context.stop(self); busy.switchOff } } } @@ -229,16 +229,17 @@ object ActorModelSpec { } } - @tailrec def await(until: Long)(condition: ⇒ Boolean): Unit = if (System.currentTimeMillis() <= until) { - var done = false - try { - done = condition - if (!done) Thread.sleep(25) - } catch { - case e: InterruptedException ⇒ - } - if (!done) await(until)(condition) - } else throw new AssertionError("await failed") + @tailrec def await(until: Long)(condition: ⇒ Boolean): Unit = + if (System.currentTimeMillis() <= until) { + var done = false + try { + done = condition + if (!done) Thread.sleep(25) + } catch { + case e: InterruptedException ⇒ + } + if (!done) await(until)(condition) + } else throw new AssertionError("await failed") } abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout { diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 7b216969ba..f1bf040d2a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -402,23 +402,24 @@ private[akka] class ActorCell( checkReceiveTimeout // Reschedule receive timeout } - def autoReceiveMessage(msg: Envelope): Unit = if (msg.message != NullMessage) { - if (system.settings.DebugAutoReceive) - publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) + def autoReceiveMessage(msg: Envelope): Unit = + if (msg.message != NullMessage) { + if (system.settings.DebugAutoReceive) + publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) - msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) - case t: Terminated ⇒ - if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor) - watchedActorTerminated(t) - case AddressTerminated(address) ⇒ addressTerminated(address) - case Kill ⇒ throw new ActorKilledException("Kill") - case PoisonPill ⇒ self.stop() - case SelectParent(m) ⇒ parent.tell(m, msg.sender) - case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } - case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) + msg.message match { + case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) + case t: Terminated ⇒ + if (t.addressTerminated) removeChildWhenToAddressTerminated(t.actor) + watchedActorTerminated(t) + case AddressTerminated(address) ⇒ addressTerminated(address) + case Kill ⇒ throw new ActorKilledException("Kill") + case PoisonPill ⇒ self.stop() + case SelectParent(m) ⇒ parent.tell(m, msg.sender) + case SelectChildName(name, m) ⇒ getChildByName(name) match { case Some(c: ChildRestartStats) ⇒ c.child.tell(m, msg.sender); case _ ⇒ } + case SelectChildPattern(p, m) ⇒ for (c ← children if p.matcher(c.path.name).matches) c.tell(m, msg.sender) + } } - } /** * When a parent is watching a child and it terminates due to AddressTerminated, @@ -507,16 +508,17 @@ private[akka] class ActorCell( } } - private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = if (!isTerminating) { - // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() - initChild(child) match { - case Some(crs) ⇒ - crs.uid = uid - handleSupervise(child, async) - if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) - case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) + private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = + if (!isTerminating) { + // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() + initChild(child) match { + case Some(crs) ⇒ + crs.uid = uid + handleSupervise(child, async) + if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) + case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) + } } - } // future extension point protected def handleSupervise(child: ActorRef, async: Boolean): Unit = child match { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index f1a8d95fd0..184478c727 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -382,7 +382,7 @@ class LocalActorRefProvider( override def isTerminated: Boolean = stopped.isOn override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match { - case Failed(ex, _) if sender ne null ⇒ causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() + case Failed(ex, _) if sender ne null ⇒ { causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() } case NullMessage ⇒ // do nothing case _ ⇒ log.error(this + " received unexpected message [" + message + "]") }) @@ -449,10 +449,11 @@ class LocalActorRefProvider( stopWhenAllTerminationHooksDone() } - def stopWhenAllTerminationHooksDone(): Unit = if (terminationHooks.isEmpty) { - eventStream.stopDefaultLoggers() - context.stop(self) - } + def stopWhenAllTerminationHooksDone(): Unit = + if (terminationHooks.isEmpty) { + eventStream.stopDefaultLoggers() + context.stop(self) + } // guardian MUST NOT lose its children during restart override def preRestart(cause: Throwable, msg: Option[Any]) {} diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index ad0e0bf7f0..d6f6ac488f 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -97,10 +97,11 @@ object FSM { if (repeat) scheduler.schedule(timeout, timeout, actor, this) else scheduler.scheduleOnce(timeout, actor, this)) - def cancel(): Unit = if (ref.isDefined) { - ref.get.cancel() - ref = None - } + def cancel(): Unit = + if (ref.isDefined) { + ref.get.cancel() + ref = None + } } /** diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index eb191ff415..017a880b1a 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -40,12 +40,13 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ * When this actor is watching the subject of [[akka.actor.Terminated]] message * it will be propagated to user's receive. */ - protected def watchedActorTerminated(t: Terminated): Unit = if (watching.contains(t.actor)) { - maintainAddressTerminatedSubscription(t.actor) { - watching -= t.actor + protected def watchedActorTerminated(t: Terminated): Unit = + if (watching.contains(t.actor)) { + maintainAddressTerminatedSubscription(t.actor) { + watching -= t.actor + } + receiveMessage(t) } - receiveMessage(t) - } protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index ff59242c39..57f905ece8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -150,23 +150,24 @@ private[akka] object MessageDispatcher { // since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1) final val debug = false // Deliberately without type ascription to make it a compile-time constant lazy val actors = new Index[MessageDispatcher, ActorRef](16, _ compareTo _) - def printActors: Unit = if (debug) { - for { - d ← actors.keys - a ← { println(d + " inhabitants: " + d.inhabitants); actors.valueIterator(d) } - } { - val status = if (a.isTerminated) " (terminated)" else " (alive)" - val messages = a match { - case r: ActorRefWithCell ⇒ " " + r.underlying.numberOfMessages + " messages" - case _ ⇒ " " + a.getClass + def printActors: Unit = + if (debug) { + for { + d ← actors.keys + a ← { println(d + " inhabitants: " + d.inhabitants); actors.valueIterator(d) } + } { + val status = if (a.isTerminated) " (terminated)" else " (alive)" + val messages = a match { + case r: ActorRefWithCell ⇒ " " + r.underlying.numberOfMessages + " messages" + case _ ⇒ " " + a.getClass + } + val parent = a match { + case i: InternalActorRef ⇒ ", parent: " + i.getParent + case _ ⇒ "" + } + println(" -> " + a + status + messages + parent) } - val parent = a match { - case i: InternalActorRef ⇒ ", parent: " + i.getParent - case _ ⇒ "" - } - println(" -> " + a + status + messages + parent) } - } implicit def defaultDispatcher(implicit system: ActorSystem): MessageDispatcher = system.dispatcher } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 2190130440..895d48a0e7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -85,17 +85,18 @@ class BalancingDispatcher( if (!registerForExecution(receiver.mailbox, false, false)) teamWork() } - protected def teamWork(): Unit = if (attemptTeamWork) { - @tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit = - if (messageQueue.hasMessages - && i.hasNext - && (executorService.executor match { - case lm: LoadMetrics ⇒ lm.atFullThrottle == false - case other ⇒ true - }) - && !registerForExecution(i.next.mailbox, false, false)) - scheduleOne(i) + protected def teamWork(): Unit = + if (attemptTeamWork) { + @tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit = + if (messageQueue.hasMessages + && i.hasNext + && (executorService.executor match { + case lm: LoadMetrics ⇒ lm.atFullThrottle == false + case other ⇒ true + }) + && !registerForExecution(i.next.mailbox, false, false)) + scheduleOne(i) - scheduleOne() - } + scheduleOne() + } } diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index b6167ffece..24fc20b824 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -285,12 +285,13 @@ class BoundedBlockingQueue[E <: AnyRef]( last = -1 //To avoid 2 subsequent removes without a next in between lock.lock() try { - @tailrec def removeTarget(i: Iterator[E] = backing.iterator()): Unit = if (i.hasNext) { - if (i.next eq target) { - i.remove() - notFull.signal() - } else removeTarget(i) - } + @tailrec def removeTarget(i: Iterator[E] = backing.iterator()): Unit = + if (i.hasNext) { + if (i.next eq target) { + i.remove() + notFull.signal() + } else removeTarget(i) + } removeTarget() } finally { diff --git a/akka-actor/src/main/scala/akka/util/ByteIterator.scala b/akka-actor/src/main/scala/akka/util/ByteIterator.scala index 49292bb877..ded49f63a6 100644 --- a/akka-actor/src/main/scala/akka/util/ByteIterator.scala +++ b/akka-actor/src/main/scala/akka/util/ByteIterator.scala @@ -252,14 +252,15 @@ object ByteIterator { normalize() } - @tailrec final override def drop(n: Int): this.type = if ((n > 0) && !isEmpty) { - val nCurrent = math.min(n, current.len) - current.drop(n) - val rest = n - nCurrent - assert(current.isEmpty || (rest == 0)) - normalize() - drop(rest) - } else this + @tailrec final override def drop(n: Int): this.type = + if ((n > 0) && !isEmpty) { + val nCurrent = math.min(n, current.len) + current.drop(n) + val rest = n - nCurrent + assert(current.isEmpty || (rest == 0)) + normalize() + drop(rest) + } else this final override def takeWhile(p: Byte ⇒ Boolean): this.type = { var stop = false @@ -275,12 +276,13 @@ object ByteIterator { normalize() } - @tailrec final override def dropWhile(p: Byte ⇒ Boolean): this.type = if (!isEmpty) { - current.dropWhile(p) - val dropMore = current.isEmpty - normalize() - if (dropMore) dropWhile(p) else this - } else this + @tailrec final override def dropWhile(p: Byte ⇒ Boolean): this.type = + if (!isEmpty) { + current.dropWhile(p) + val dropMore = current.isEmpty + normalize() + if (dropMore) dropWhile(p) else this + } else this final override def copyToArray[B >: Byte](xs: Array[B], start: Int, len: Int): Unit = { var pos = start @@ -309,19 +311,20 @@ object ByteIterator { } } - @tailrec protected final def getToArray[A](xs: Array[A], offset: Int, n: Int, elemSize: Int)(getSingle: ⇒ A)(getMult: (Array[A], Int, Int) ⇒ Unit): this.type = if (n <= 0) this else { - if (isEmpty) Iterator.empty.next - val nDone = if (current.len >= elemSize) { - val nCurrent = math.min(n, current.len / elemSize) - getMult(xs, offset, nCurrent) - nCurrent - } else { - xs(offset) = getSingle - 1 + @tailrec protected final def getToArray[A](xs: Array[A], offset: Int, n: Int, elemSize: Int)(getSingle: ⇒ A)(getMult: (Array[A], Int, Int) ⇒ Unit): this.type = + if (n <= 0) this else { + if (isEmpty) Iterator.empty.next + val nDone = if (current.len >= elemSize) { + val nCurrent = math.min(n, current.len / elemSize) + getMult(xs, offset, nCurrent) + nCurrent + } else { + xs(offset) = getSingle + 1 + } + normalize() + getToArray(xs, offset + nDone, n - nDone, elemSize)(getSingle)(getMult) } - normalize() - getToArray(xs, offset + nDone, n - nDone, elemSize)(getSingle)(getMult) - } def getBytes(xs: Array[Byte], offset: Int, n: Int): this.type = getToArray(xs, offset, n, 1) { getByte } { current.getBytes(_, _, _) } @@ -359,16 +362,17 @@ object ByteIterator { } override def skip(n: Long): Long = { - @tailrec def skipImpl(n: Long, skipped: Long): Long = if (n > 0) { - if (!isEmpty) { - val m = current.asInputStream.skip(n) - normalize() - val newN = n - m - val newSkipped = skipped + m - if (newN > 0) skipImpl(newN, newSkipped) - else newSkipped + @tailrec def skipImpl(n: Long, skipped: Long): Long = + if (n > 0) { + if (!isEmpty) { + val m = current.asInputStream.skip(n) + normalize() + val newN = n - m + val newSkipped = skipped + m + if (newN > 0) skipImpl(newN, newSkipped) + else newSkipped + } else 0 } else 0 - } else 0 skipImpl(n, 0) } diff --git a/akka-actor/src/main/scala/akka/util/Collections.scala b/akka-actor/src/main/scala/akka/util/Collections.scala index f7a6368f45..293556861b 100644 --- a/akka-actor/src/main/scala/akka/util/Collections.scala +++ b/akka-actor/src/main/scala/akka/util/Collections.scala @@ -38,12 +38,13 @@ private[akka] object Collections { } else hasNext //Attempt to find the next } else _hasNext // Return if we found one - override final def next(): To = if (hasNext) { - val ret = _next - _next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value) - _hasNext = false // Mark as consumed (we need to look for the next value) - ret - } else throw new java.util.NoSuchElementException("next") + override final def next(): To = + if (hasNext) { + val ret = _next + _next = null.asInstanceOf[To] // Mark as consumed (nice to the GC, don't leak the last returned value) + _hasNext = false // Mark as consumed (we need to look for the next value) + ret + } else throw new java.util.NoSuchElementException("next") } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 5271bd6cd5..b2ecc0fbdb 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -803,8 +803,9 @@ private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Acto def oneWayGossipTo(address: Address): Unit = gossipTo(address, GossipEnvelope(selfAddress, latestGossip, conversation = false)) - def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = if (address != selfAddress) - coreSender ! SendClusterMessage(address, gossipMsg) + def gossipTo(address: Address, gossipMsg: GossipEnvelope): Unit = + if (address != selfAddress) + coreSender ! SendClusterMessage(address, gossipMsg) def publish(newGossip: Gossip): Unit = { publisher ! PublishChanges(newGossip) diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala index 81ac04e64c..f46b611c44 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterHeartbeat.scala @@ -143,17 +143,14 @@ private[cluster] final class ClusterHeartbeatSender extends Actor with ActorLogg case ExpectedFirstHeartbeat(from) ⇒ triggerFirstHeartbeat(from) } - def reset(snapshot: InstantClusterState): Unit = - state = state.reset(snapshot.members.map(_.address)) + def reset(snapshot: InstantClusterState): Unit = state = state.reset(snapshot.members.map(_.address)) - def addMember(m: Member): Unit = if (m.address != selfAddress) - state = state addMember m.address + def addMember(m: Member): Unit = if (m.address != selfAddress) state = state addMember m.address - def removeMember(m: Member): Unit = if (m.address != selfAddress) - state = state removeMember m.address + def removeMember(m: Member): Unit = if (m.address != selfAddress) state = state removeMember m.address - def addHeartbeatRequest(address: Address): Unit = if (address != selfAddress) - state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive) + def addHeartbeatRequest(address: Address): Unit = + if (address != selfAddress) state = state.addHeartbeatRequest(address, Deadline.now + HeartbeatRequestTimeToLive) def sendHeartbeatRequest(address: Address): Unit = if (!cluster.failureDetector.isMonitoring(address) && state.ring.mySenders.contains(address)) { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala index bd093c9f74..4c3f70108a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterMetricsCollector.scala @@ -307,12 +307,13 @@ case class Metric private (name: String, value: Number, private val average: Opt * If defined ( [[akka.cluster.MetricNumericConverter.defined()]] ), updates the new * data point, and if defined, updates the data stream. Returns the updated metric. */ - def :+(latest: Metric): Metric = if (this sameAs latest) average match { - case Some(avg) ⇒ copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue)) - case None if latest.average.isDefined ⇒ copy(value = latest.value, average = latest.average) - case _ ⇒ copy(value = latest.value) - } - else this + def :+(latest: Metric): Metric = + if (this sameAs latest) average match { + case Some(avg) ⇒ copy(value = latest.value, average = Some(avg :+ latest.value.doubleValue)) + case None if latest.average.isDefined ⇒ copy(value = latest.value, average = latest.average) + case _ ⇒ copy(value = latest.value) + } + else this /** * The numerical value of the average, if defined, otherwise the latest value diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 571148af41..af69f977e0 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -143,8 +143,8 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { /** * Unsubscribe to cluster events. */ - def close(): Unit = if (!eventBusListener.isTerminated) { - eventBusListener ! PoisonPill - } + def close(): Unit = + if (!eventBusListener.isTerminated) + eventBusListener ! PoisonPill } diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index fe8deb887d..1027a14279 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -81,13 +81,13 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS } } - def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) { - sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*"))) - } + def muteMarkingAsUnreachable(sys: ActorSystem = system): Unit = + if (!sys.log.isDebugEnabled) + sys.eventStream.publish(Mute(EventFilter.error(pattern = ".*Marking.* as UNREACHABLE.*"))) - def muteDeadLetters(sys: ActorSystem = system): Unit = if (!sys.log.isDebugEnabled) { - sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*"))) - } + def muteDeadLetters(sys: ActorSystem = system): Unit = + if (!sys.log.isDebugEnabled) + sys.eventStream.publish(Mute(EventFilter.warning(pattern = ".*received dead letter from.*"))) override def afterAll(): Unit = { if (!log.isDebugEnabled) { @@ -201,15 +201,16 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec { self: MultiNodeS * out of all nodes in the cluster. First * member in the cluster ring is expected leader. */ - def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit = if (nodesInCluster.contains(myself)) { - nodesInCluster.length must not be (0) - val expectedLeader = roleOfLeader(nodesInCluster) - val leader = clusterView.leader - val isLeader = leader == Some(clusterView.selfAddress) - assert(isLeader == isNode(expectedLeader), - "expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members)) - clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving)) - } + def assertLeaderIn(nodesInCluster: immutable.Seq[RoleName]): Unit = + if (nodesInCluster.contains(myself)) { + nodesInCluster.length must not be (0) + val expectedLeader = roleOfLeader(nodesInCluster) + val leader = clusterView.leader + val isLeader = leader == Some(clusterView.selfAddress) + assert(isLeader == isNode(expectedLeader), + "expectedLeader [%s], got leader [%s], members [%s]".format(expectedLeader, leader, clusterView.members)) + clusterView.status must (be(MemberStatus.Up) or be(MemberStatus.Leaving)) + } /** * Wait until the expected number of members has status Up and convergence has been reached. diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index e2d8c19daf..16a163c026 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -435,11 +435,12 @@ object StressMultiJvmSpec extends MultiNodeConfig { case RetryTick ⇒ resend() } - def done(replyTo: ActorRef): Unit = if (outstanding.isEmpty) { - val duration = (System.nanoTime - startTime).nanos - replyTo ! WorkResult(duration, sendCounter, ackCounter) - context stop self - } + def done(replyTo: ActorRef): Unit = + if (outstanding.isEmpty) { + val duration = (System.nanoTime - startTime).nanos + replyTo ! WorkResult(duration, sendCounter, ackCounter) + context stop self + } def sendJobs(): Unit = { 0 until settings.workBatchSize foreach { _ ⇒ diff --git a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala index 5ea1b2e8cb..8794504616 100644 --- a/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala +++ b/akka-remote-tests/src/main/scala/akka/remote/testconductor/Conductor.scala @@ -125,9 +125,10 @@ trait Conductor { this: TestConductorExt ⇒ def blackhole(node: RoleName, target: RoleName, direction: Direction): Future[Done] = throttle(node, target, direction, 0f) - private def requireTestConductorTranport(): Unit = if (!transport.defaultAddress.protocol.contains(".gremlin.trttl.")) - throw new ConfigurationException("To use this feature you must activate the failure injector adapters " + - "(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.") + private def requireTestConductorTranport(): Unit = + if (!transport.defaultAddress.protocol.contains(".gremlin.trttl.")) + throw new ConfigurationException("To use this feature you must activate the failure injector adapters " + + "(gremlin, trttl) by specifying `testTransport(on = true)` in your MultiNodeConfig.") /** * Switch the Netty pipeline of the remote support into pass through mode for @@ -377,7 +378,7 @@ private[akka] class Controller(private var initialParticipants: Int, controllerP case BarrierTimeout(data) ⇒ failBarrier(data) case FailedBarrier(data) ⇒ failBarrier(data) case BarrierEmpty(data, msg) ⇒ SupervisorStrategy.Resume - case WrongBarrier(name, client, data) ⇒ client ! ToClient(BarrierResult(name, false)); failBarrier(data) + case WrongBarrier(name, client, data) ⇒ { client ! ToClient(BarrierResult(name, false)); failBarrier(data) } case ClientLost(data, node) ⇒ failBarrier(data) case DuplicateNode(data, node) ⇒ failBarrier(data) } diff --git a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala index 4ec3ed2e84..584a9f2b27 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/testkit/LogRoleReplace.scala @@ -100,10 +100,11 @@ class LogRoleReplace { def process(in: BufferedReader, out: PrintWriter): Unit = { @tailrec - def processLines(line: String): Unit = if (line ne null) { - out.println(processLine(line)) - processLines(in.readLine) - } + def processLines(line: String): Unit = + if (line ne null) { + out.println(processLine(line)) + processLines(in.readLine) + } processLines(in.readLine()) } diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index f3d2d71301..f0e4f61043 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -298,17 +298,18 @@ private[remote] object EndpointManager { endpoint } - def unregisterEndpoint(endpoint: ActorRef): Unit = if (isWritable(endpoint)) { - val address = writableToAddress(endpoint) - addressToWritable.get(address) match { - case Some(policy) if policy.isTombstone ⇒ // There is already a tombstone directive, leave it there - case _ ⇒ addressToWritable -= address + def unregisterEndpoint(endpoint: ActorRef): Unit = + if (isWritable(endpoint)) { + val address = writableToAddress(endpoint) + addressToWritable.get(address) match { + case Some(policy) if policy.isTombstone ⇒ // There is already a tombstone directive, leave it there + case _ ⇒ addressToWritable -= address + } + writableToAddress -= endpoint + } else if (isReadOnly(endpoint)) { + addressToReadonly -= readonlyToAddress(endpoint) + readonlyToAddress -= endpoint } - writableToAddress -= endpoint - } else if (isReadOnly(endpoint)) { - addressToReadonly -= readonlyToAddress(endpoint) - readonlyToAddress -= endpoint - } def writableEndpointWithPolicyFor(address: Address): Option[EndpointPolicy] = addressToWritable.get(address) @@ -328,13 +329,14 @@ private[remote] object EndpointManager { case _ ⇒ false } - def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit = if (isWritable(endpoint)) { - addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease) - writableToAddress -= endpoint - } else if (isReadOnly(endpoint)) { - addressToReadonly -= readonlyToAddress(endpoint) - readonlyToAddress -= endpoint - } + def markAsFailed(endpoint: ActorRef, timeOfRelease: Deadline): Unit = + if (isWritable(endpoint)) { + addressToWritable += writableToAddress(endpoint) -> Gated(timeOfRelease) + writableToAddress -= endpoint + } else if (isReadOnly(endpoint)) { + addressToReadonly -= readonlyToAddress(endpoint) + readonlyToAddress -= endpoint + } def markAsQuarantined(address: Address, reason: Throwable): Unit = addressToWritable += address -> Quarantined(reason) diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 5643951ea4..e6fad8dd30 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -48,9 +48,10 @@ trait SchemeAugmenter { protected def augmentScheme(address: Address): Address = address.copy(protocol = augmentScheme(address.protocol)) - protected def removeScheme(scheme: String): String = if (scheme.endsWith(s".$addedSchemeIdentifier")) - scheme.take(scheme.length - addedSchemeIdentifier.length - 1) - else scheme + protected def removeScheme(scheme: String): String = + if (scheme.endsWith(s".$addedSchemeIdentifier")) + scheme.take(scheme.length - addedSchemeIdentifier.length - 1) + else scheme protected def removeScheme(address: Address): Address = address.copy(protocol = removeScheme(address.protocol)) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala index e3f204115b..ab32cbefe4 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/FailureInjectorTransportAdapter.scala @@ -135,7 +135,8 @@ private[remote] case class FailureInjectorHandle(_wrappedHandle: AssociationHand override def disassociate(): Unit = wrappedHandle.disassociate() - override def notify(ev: HandleEvent): Unit = if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress)) - upstreamListener notify ev + override def notify(ev: HandleEvent): Unit = + if (!gremlinAdapter.shouldDropInbound(wrappedHandle.remoteAddress)) + upstreamListener notify ev } diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 76d4d77b04..bc0c9e7c0e 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -70,9 +70,10 @@ object ThrottlerTransportAdapter { case class TokenBucket(capacity: Int, tokensPerSecond: Double, nanoTimeOfLastSend: Long, availableTokens: Int) extends ThrottleMode { - private def isAvailable(nanoTimeOfSend: Long, tokens: Int): Boolean = if ((tokens > capacity && availableTokens > 0)) { - true // Allow messages larger than capacity through, it will be recorded as negative tokens - } else min((availableTokens + tokensGenerated(nanoTimeOfSend)), capacity) >= tokens + private def isAvailable(nanoTimeOfSend: Long, tokens: Int): Boolean = + if ((tokens > capacity && availableTokens > 0)) { + true // Allow messages larger than capacity through, it will be recorded as negative tokens + } else min((availableTokens + tokensGenerated(nanoTimeOfSend)), capacity) >= tokens override def tryConsumeTokens(nanoTimeOfSend: Long, tokens: Int): (ThrottleMode, Boolean) = { if (isAvailable(nanoTimeOfSend, tokens)) diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala index c4c7e7c674..fc02383337 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/TcpSupport.scala @@ -68,10 +68,11 @@ private[remote] class TcpAssociationHandle(val localAddress: Address, val remote override val readHandlerPromise: Promise[HandleEventListener] = Promise() - override def write(payload: ByteString): Boolean = if (channel.isWritable && channel.isOpen) { - channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer)) - true - } else false + override def write(payload: ByteString): Boolean = + if (channel.isWritable && channel.isOpen) { + channel.write(ChannelBuffers.wrappedBuffer(payload.asByteBuffer)) + true + } else false override def disassociate(): Unit = NettyTransport.gracefulClose(channel) } diff --git a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala index 5b61c1e875..af4614aa6e 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/AkkaProtocolSpec.scala @@ -90,32 +90,35 @@ class AkkaProtocolSpec extends AkkaSpec("""akka.actor.provider = "akka.remote.Re (new TestFailureDetector, registry, transport, handle) } - def lastActivityIsHeartbeat(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { - case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ - codec.decodePdu(payload) match { - case Heartbeat ⇒ true - case _ ⇒ false - } - case _ ⇒ false - } + def lastActivityIsHeartbeat(registry: AssociationRegistry) = + if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { + case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ + codec.decodePdu(payload) match { + case Heartbeat ⇒ true + case _ ⇒ false + } + case _ ⇒ false + } - def lastActivityIsAssociate(registry: AssociationRegistry, cookie: Option[String]) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { - case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ - codec.decodePdu(payload) match { - case Associate(c, origin) if c == cookie && origin == localAddress ⇒ true - case _ ⇒ false - } - case _ ⇒ false - } + def lastActivityIsAssociate(registry: AssociationRegistry, cookie: Option[String]) = + if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { + case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ + codec.decodePdu(payload) match { + case Associate(c, origin) if c == cookie && origin == localAddress ⇒ true + case _ ⇒ false + } + case _ ⇒ false + } - def lastActivityIsDisassociate(registry: AssociationRegistry) = if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { - case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ - codec.decodePdu(payload) match { - case Disassociate ⇒ true - case _ ⇒ false - } - case _ ⇒ false - } + def lastActivityIsDisassociate(registry: AssociationRegistry) = + if (registry.logSnapshot.isEmpty) false else registry.logSnapshot.last match { + case WriteAttempt(sender, recipient, payload) if sender == localAddress && recipient == remoteAddress ⇒ + codec.decodePdu(payload) match { + case Disassociate ⇒ true + case _ ⇒ false + } + case _ ⇒ false + } "ProtocolStateActor" must { diff --git a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala index 4a33317ef3..de1b825ee1 100644 --- a/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/transport/GenericTransportSpec.scala @@ -25,10 +25,11 @@ abstract class GenericTransportSpec(withAkkaProtocol: Boolean = false) val nonExistingAddress = Address("test." + schemeIdentifier, "nosystem", "nohost", 0) def freshTransport(testTransport: TestTransport): Transport - def wrapTransport(transport: Transport): Transport = if (withAkkaProtocol) { - val provider = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] - new AkkaProtocolTransport(transport, system, new AkkaProtocolSettings(provider.remoteSettings.config), AkkaPduProtobufCodec) - } else transport + def wrapTransport(transport: Transport): Transport = + if (withAkkaProtocol) { + val provider = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider] + new AkkaProtocolTransport(transport, system, new AkkaProtocolSettings(provider.remoteSettings.config), AkkaPduProtobufCodec) + } else transport def newTransportA(registry: AssociationRegistry): Transport = wrapTransport(freshTransport(new TestTransport(addressATest, registry)))