chore: bump scalafmt to 3.9.7 (#1916)
This commit is contained in:
parent
21ad8968b1
commit
9577b01b10
308 changed files with 689 additions and 688 deletions
|
|
@ -1,4 +1,4 @@
|
|||
version = 3.9.3
|
||||
version = 3.9.7
|
||||
runner.dialect = scala213
|
||||
project.git = true
|
||||
style = defaultWithAlign
|
||||
|
|
|
|||
|
|
@ -102,7 +102,7 @@ private[pekko] final class BehaviorTestKitImpl[T](
|
|||
def expectEffectClass[E <: Effect](effectClass: Class[E]): E = {
|
||||
context.effectQueue.poll() match {
|
||||
case null if effectClass.isAssignableFrom(NoEffects.getClass) => effectClass.cast(NoEffects)
|
||||
case null =>
|
||||
case null =>
|
||||
throw new AssertionError(s"expected: effect type ${effectClass.getName} but no effects were recorded")
|
||||
case effect if effectClass.isAssignableFrom(effect.getClass) => effect.asInstanceOf[E]
|
||||
case other => throw new AssertionError(s"expected: effect class ${effectClass.getName} but found $other")
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@ import pekko.annotation.InternalApi
|
|||
throw new IllegalStateException(
|
||||
s"$CapturingAppenderName not defined for [${loggerNameOrRoot(loggerName)}] in logback-test.xml")
|
||||
case appender: CapturingAppender => appender
|
||||
case other =>
|
||||
case other =>
|
||||
throw new IllegalStateException(s"Unexpected $CapturingAppender: $other")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ import scala.annotation.tailrec
|
|||
case ch.qos.logback.classic.Level.INFO_INT => Level.INFO
|
||||
case ch.qos.logback.classic.Level.WARN_INT => Level.WARN
|
||||
case ch.qos.logback.classic.Level.ERROR_INT => Level.ERROR
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException("Level " + level.levelStr + ", " + level.levelInt + " is unknown.")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ private[pekko] final class FunctionRef[-T](override val path: ActorPath, send: (
|
|||
checkCurrentActorThread()
|
||||
_children.get(name) match {
|
||||
case Some(_) => throw classic.InvalidActorNameException(s"actor name $name is already taken")
|
||||
case None =>
|
||||
case None =>
|
||||
val btk = new BehaviorTestKitImpl[U](system, (path / name).withUid(rnd().nextInt()), behavior)
|
||||
_children += name -> btk
|
||||
btk.context.self
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ import java.util.Collections
|
|||
val logger = getLogbackLogger(loggerName)
|
||||
logger.getAppender(TestAppenderName) match {
|
||||
case testAppender: TestAppender => testAppender
|
||||
case null =>
|
||||
case null =>
|
||||
throw new IllegalStateException(s"No $TestAppenderName was setup for logger [${logger.getName}]")
|
||||
case other =>
|
||||
throw new IllegalStateException(
|
||||
|
|
|
|||
|
|
@ -313,8 +313,8 @@ private[pekko] final class TestProbeImpl[M](name: String, system: ActorSystem[_]
|
|||
ex)
|
||||
}
|
||||
outcome match {
|
||||
case FishingOutcome.Complete => (message :: seen).reverse
|
||||
case FishingOutcome.Fail(error) => assertFail(s"$error, hint: $hint")
|
||||
case FishingOutcome.Complete => (message :: seen).reverse
|
||||
case FishingOutcome.Fail(error) => assertFail(s"$error, hint: $hint")
|
||||
case continue: FishingOutcome.ContinueOutcome =>
|
||||
val newTimeout = timeout - (System.nanoTime() - start).nanos
|
||||
continue match {
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ object ManualTime {
|
|||
case adapter: SchedulerAdapter =>
|
||||
adapter.classicScheduler match {
|
||||
case sc: pekko.testkit.ExplicitlyTriggeredScheduler => new ManualTime(sc)
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
"ActorSystem not configured with explicitly triggered scheduler, " +
|
||||
"make sure to include org.apache.pekko.actor.testkit.typed.scaladsl.ManualTime.config() when setting up the test")
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ object ManualTime {
|
|||
case adapter: SchedulerAdapter =>
|
||||
adapter.classicScheduler match {
|
||||
case sc: pekko.testkit.ExplicitlyTriggeredScheduler => new ManualTime(sc)
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
"ActorSystem not configured with explicitly triggered scheduler, " +
|
||||
"make sure to include org.apache.pekko.actor.testkit.typed.scaladsl.ManualTime.config() when setting up the test")
|
||||
|
|
|
|||
|
|
@ -133,11 +133,11 @@ object BehaviorTestKitSpec {
|
|||
Behaviors.same
|
||||
case ScheduleCommand(key, delay, mode, cmd) =>
|
||||
mode match {
|
||||
case Effect.TimerScheduled.SingleMode => timers.startSingleTimer(key, cmd, delay)
|
||||
case Effect.TimerScheduled.FixedDelayMode => timers.startTimerWithFixedDelay(key, cmd, delay, delay)
|
||||
case Effect.TimerScheduled.SingleMode => timers.startSingleTimer(key, cmd, delay)
|
||||
case Effect.TimerScheduled.FixedDelayMode => timers.startTimerWithFixedDelay(key, cmd, delay, delay)
|
||||
case m: Effect.TimerScheduled.FixedDelayModeWithInitialDelay =>
|
||||
timers.startTimerWithFixedDelay(key, cmd, m.initialDelay, delay)
|
||||
case Effect.TimerScheduled.FixedRateMode => timers.startTimerAtFixedRate(key, cmd, delay, delay)
|
||||
case Effect.TimerScheduled.FixedRateMode => timers.startTimerAtFixedRate(key, cmd, delay, delay)
|
||||
case m: Effect.TimerScheduled.FixedRateModeWithInitialDelay =>
|
||||
timers.startTimerAtFixedRate(key, cmd, m.initialDelay, delay)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -66,9 +66,9 @@ object ActorRefSpec {
|
|||
class SenderActor(replyActor: ActorRef, latch: TestLatch) extends Actor {
|
||||
|
||||
def receive = {
|
||||
case "complex" => replyActor ! "complexRequest"
|
||||
case "complex2" => replyActor ! "complexRequest2"
|
||||
case "simple" => replyActor ! "simpleRequest"
|
||||
case "complex" => replyActor ! "complexRequest"
|
||||
case "complex2" => replyActor ! "complexRequest2"
|
||||
case "simple" => replyActor ! "simpleRequest"
|
||||
case "complexReply" => {
|
||||
latch.countDown()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,8 +61,8 @@ object DeathWatchSpec {
|
|||
|
||||
class WUWatcher extends Actor {
|
||||
def receive = {
|
||||
case W(ref) => context.watch(ref)
|
||||
case U(ref) => context.unwatch(ref)
|
||||
case W(ref) => context.watch(ref)
|
||||
case U(ref) => context.unwatch(ref)
|
||||
case Latches(t1: TestLatch, t2: TestLatch) =>
|
||||
t1.countDown()
|
||||
Await.ready(t2, 3.seconds)
|
||||
|
|
|
|||
|
|
@ -78,7 +78,7 @@ class DynamicAccessSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll
|
|||
// recoverWith doesn't work with scala 2.13.0-M5
|
||||
// https://github.com/scala/bug/issues/11242
|
||||
dynamicAccess.createInstanceFor[TestSuperclass](fqcn, Nil) match {
|
||||
case s: Success[TestSuperclass] => s
|
||||
case s: Success[TestSuperclass] => s
|
||||
case Failure(_: NoSuchMethodException) =>
|
||||
dynamicAccess
|
||||
.createInstanceFor[TestSuperclass](fqcn, immutable.Seq((classOf[String], "string ctor argument")))
|
||||
|
|
|
|||
|
|
@ -111,7 +111,7 @@ class HotSwapSpec extends PekkoSpec with ImplicitSender {
|
|||
val a = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "state" => sender() ! "0"
|
||||
case "swap" =>
|
||||
case "swap" =>
|
||||
context.become {
|
||||
case "state" => sender() ! "1"
|
||||
case "swapped" => sender() ! "swapped"
|
||||
|
|
|
|||
|
|
@ -120,7 +120,7 @@ class ReceiveTimeoutSpec extends PekkoSpec() {
|
|||
context.setReceiveTimeout(500.milliseconds)
|
||||
|
||||
def receive = {
|
||||
case Tick => ()
|
||||
case Tick => ()
|
||||
case ReceiveTimeout =>
|
||||
count.incrementAndGet
|
||||
timeoutLatch.open()
|
||||
|
|
|
|||
|
|
@ -325,7 +325,7 @@ object SupervisorHierarchySpec {
|
|||
// an indication of duplicate Terminate messages
|
||||
log :+= Event(s"${sender()} terminated while pongOfDeath", identityHashCode(Hierarchy.this))
|
||||
}
|
||||
case Abort => abort("terminating")
|
||||
case Abort => abort("terminating")
|
||||
case PingOfDeath =>
|
||||
if (size > 1) {
|
||||
pongsToGo = context.children.size
|
||||
|
|
@ -528,7 +528,7 @@ object SupervisorHierarchySpec {
|
|||
stay()
|
||||
case this.Event(Work, x) if x > 0 =>
|
||||
nextJob.next() match {
|
||||
case Ping(ref) => ref ! "ping"
|
||||
case Ping(ref) => ref ! "ping"
|
||||
case Fail(ref, dir) =>
|
||||
val f = Failure(
|
||||
dir,
|
||||
|
|
@ -547,7 +547,7 @@ object SupervisorHierarchySpec {
|
|||
if (idleChildren.nonEmpty) self ! Work
|
||||
else context.system.scheduler.scheduleOnce(workSchedule, self, Work)(context.dispatcher)
|
||||
stay().using(x - 1)
|
||||
case this.Event(Work, _) => if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
|
||||
case this.Event(Work, _) => if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
|
||||
case this.Event(Died(path), _) =>
|
||||
bury(path)
|
||||
stay()
|
||||
|
|
@ -605,7 +605,7 @@ object SupervisorHierarchySpec {
|
|||
}
|
||||
|
||||
when(Stopping, stateTimeout = 5.seconds.dilated) {
|
||||
case this.Event(PongOfDeath, _) => stay()
|
||||
case this.Event(PongOfDeath, _) => stay()
|
||||
case this.Event(Terminated(r), _) if r == hierarchy =>
|
||||
@nowarn
|
||||
val undead = children.filterNot(_.isTerminated)
|
||||
|
|
|
|||
|
|
@ -430,7 +430,7 @@ class SupervisorSpec
|
|||
}
|
||||
|
||||
def receive = {
|
||||
case Ping => sender() ! PongMessage
|
||||
case Ping => sender() ! PongMessage
|
||||
case DieReply =>
|
||||
val e = new RuntimeException("Expected")
|
||||
sender() ! Status.Failure(e)
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ object ActorModelSpec {
|
|||
case Increment(count) => { ack(); count.incrementAndGet(); busy.switchOff(()) }
|
||||
case CountDownNStop(l) => { ack(); l.countDown(); context.stop(self); busy.switchOff(()) }
|
||||
case Restart => { ack(); busy.switchOff(()); throw new Exception("Restart requested") }
|
||||
case Interrupt => {
|
||||
case Interrupt => {
|
||||
ack(); sender() ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!")));
|
||||
busy.switchOff(()); throw new InterruptedException("Ping!")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -205,7 +205,7 @@ class PriorityMailboxSpec extends MailboxSpec {
|
|||
val comparator = PriorityGenerator(_.##)
|
||||
lazy val name = "The priority mailbox implementation"
|
||||
def factory = {
|
||||
case UnboundedMailbox() => new UnboundedPriorityMailbox(comparator).create(None, None)
|
||||
case UnboundedMailbox() => new UnboundedPriorityMailbox(comparator).create(None, None)
|
||||
case BoundedMailbox(capacity, pushTimeOut) =>
|
||||
new BoundedPriorityMailbox(comparator, capacity, pushTimeOut).create(None, None)
|
||||
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
|
||||
|
|
@ -216,7 +216,7 @@ class StablePriorityMailboxSpec extends MailboxSpec {
|
|||
val comparator = PriorityGenerator(_.##)
|
||||
lazy val name = "The stable priority mailbox implementation"
|
||||
def factory = {
|
||||
case UnboundedMailbox() => new UnboundedStablePriorityMailbox(comparator).create(None, None)
|
||||
case UnboundedMailbox() => new UnboundedStablePriorityMailbox(comparator).create(None, None)
|
||||
case BoundedMailbox(capacity, pushTimeOut) =>
|
||||
new BoundedStablePriorityMailbox(comparator, capacity, pushTimeOut).create(None, None)
|
||||
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
|
||||
|
|
@ -226,7 +226,7 @@ class StablePriorityMailboxSpec extends MailboxSpec {
|
|||
class ControlAwareMailboxSpec extends MailboxSpec {
|
||||
lazy val name = "The control aware mailbox implementation"
|
||||
def factory = {
|
||||
case UnboundedMailbox() => new UnboundedControlAwareMailbox().create(None, None)
|
||||
case UnboundedMailbox() => new UnboundedControlAwareMailbox().create(None, None)
|
||||
case BoundedMailbox(capacity, pushTimeOut) =>
|
||||
new BoundedControlAwareMailbox(capacity, pushTimeOut).create(None, None)
|
||||
case _ => throw new RuntimeException() // compiler exhaustiveness check pleaser
|
||||
|
|
|
|||
|
|
@ -47,7 +47,7 @@ object BackoffSupervisorSpec {
|
|||
class ManualChild(probe: ActorRef) extends Actor {
|
||||
def receive: Receive = {
|
||||
case "boom" => throw new TestException
|
||||
case msg =>
|
||||
case msg =>
|
||||
probe ! msg
|
||||
context.parent ! BackoffSupervisor.Reset
|
||||
}
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ class BroadcastSpec extends PekkoSpec with DefaultTimeout with ImplicitSender {
|
|||
val counter1 = new AtomicInteger
|
||||
val actor1 = system.actorOf(Props(new Actor {
|
||||
def receive = {
|
||||
case "end" => doneLatch.countDown()
|
||||
case "end" => doneLatch.countDown()
|
||||
case msg: Int =>
|
||||
counter1.addAndGet(msg)
|
||||
sender() ! "ack"
|
||||
|
|
|
|||
|
|
@ -34,8 +34,9 @@ class RouteeCreationSpec extends PekkoSpec {
|
|||
})))
|
||||
for (i <- 1 to N) {
|
||||
expectMsgType[ActorIdentity] match {
|
||||
case ActorIdentity(_, Some(_)) => // fine
|
||||
case x => fail(s"routee $i was not found $x")
|
||||
case ActorIdentity(_,
|
||||
Some(_)) => // fine
|
||||
case x => fail(s"routee $i was not found $x")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,7 +41,7 @@ object ScatterGatherFirstCompletedSpec {
|
|||
case Stop(None) => context.stop(self)
|
||||
case Stop(Some(_id)) if _id == id => context.stop(self)
|
||||
case _id: Int if _id == id =>
|
||||
case _ => {
|
||||
case _ => {
|
||||
Thread.sleep(100 * id)
|
||||
sender() ! id
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ object TailChoppingSpec {
|
|||
def receive = {
|
||||
case "stop" => context.stop(self)
|
||||
case "times" => sender() ! times
|
||||
case _ =>
|
||||
case _ =>
|
||||
times += 1
|
||||
Thread.sleep(sleepTime.toMillis)
|
||||
sender() ! "ack"
|
||||
|
|
|
|||
|
|
@ -509,7 +509,7 @@ abstract class ActorContextSpec extends ScalaTestWithActorTestKit with AnyWordSp
|
|||
}
|
||||
.receiveSignal {
|
||||
case (_, Terminated(_)) => Behaviors.unhandled
|
||||
case (_, signal) =>
|
||||
case (_, signal) =>
|
||||
probe.ref ! ReceivedSignal(signal)
|
||||
Behaviors.same
|
||||
}
|
||||
|
|
|
|||
|
|
@ -70,7 +70,7 @@ object OrElseSpec {
|
|||
// this could be provided as a general purpose utility
|
||||
@tailrec def handle(command: Ping, handlers: List[Ping => Behavior[Ping]]): Behavior[Ping] = {
|
||||
handlers match {
|
||||
case Nil => Behaviors.unhandled
|
||||
case Nil => Behaviors.unhandled
|
||||
case head :: tail =>
|
||||
val next = head(command)
|
||||
if (Behavior.isUnhandled(next)) handle(command, tail)
|
||||
|
|
|
|||
|
|
@ -1270,7 +1270,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
|
|||
"replace supervision when new returned behavior catches same exception" in {
|
||||
val probe = TestProbe[AnyRef]("probeMcProbeFace")
|
||||
val behv = supervise[String](Behaviors.receiveMessage {
|
||||
case "boom" => throw TestException("boom indeed")
|
||||
case "boom" => throw TestException("boom indeed")
|
||||
case "switch" =>
|
||||
supervise[String](
|
||||
supervise[String](supervise[String](supervise[String](supervise[String](Behaviors.receiveMessage {
|
||||
|
|
@ -1320,7 +1320,7 @@ class SupervisionSpec extends ScalaTestWithActorTestKit("""
|
|||
}
|
||||
|
||||
val behv = supervise[String](Behaviors.receiveMessage {
|
||||
case "boom" => throw TestException("boom indeed")
|
||||
case "boom" => throw TestException("boom indeed")
|
||||
case "switch" =>
|
||||
supervise[String](setup(_ =>
|
||||
supervise[String](
|
||||
|
|
|
|||
|
|
@ -56,7 +56,7 @@ object TestProducerWithAsk {
|
|||
Behaviors.receivePartial {
|
||||
case (_, Tick) => Behaviors.same
|
||||
case (_, RequestNext(sendTo)) => active(n + 1, replyProbe, sendTo)
|
||||
case (_, Confirmed(seqNr)) =>
|
||||
case (_, Confirmed(seqNr)) =>
|
||||
replyProbe ! seqNr
|
||||
Behaviors.same
|
||||
case (ctx, AskTimeout) =>
|
||||
|
|
|
|||
|
|
@ -151,7 +151,7 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
|
|||
"contain the object class name where the first log was called" in {
|
||||
val eventFilter = LoggingTestKit.custom {
|
||||
case event if event.loggerName == WhereTheBehaviorIsDefined.getClass.getName => true
|
||||
case other =>
|
||||
case other =>
|
||||
println(other.loggerName)
|
||||
false
|
||||
}
|
||||
|
|
@ -162,7 +162,7 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
|
|||
"contain the abstract behavior class name where the first log was called" in {
|
||||
val eventFilter = LoggingTestKit.custom {
|
||||
case event if event.loggerName == classOf[BehaviorWhereTheLoggerIsUsed].getName => true
|
||||
case other =>
|
||||
case other =>
|
||||
println(other.loggerName)
|
||||
false
|
||||
}
|
||||
|
|
|
|||
|
|
@ -647,7 +647,7 @@ class UnstashingSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
|
|||
def unstashing(n: Int): Behavior[String] =
|
||||
Behaviors.receiveMessagePartial {
|
||||
case "unhandled" => Behaviors.unhandled
|
||||
case "handled" =>
|
||||
case "handled" =>
|
||||
probe.ref ! s"handled $n"
|
||||
unstashing(n + 1)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -143,8 +143,8 @@ object AdapterSpec {
|
|||
}
|
||||
|
||||
def receive = {
|
||||
case "send" => ref ! Ping(self) // implicit conversion
|
||||
case "pong" => probe ! "ok"
|
||||
case "send" => ref ! Ping(self) // implicit conversion
|
||||
case "pong" => probe ! "ok"
|
||||
case "spawn" =>
|
||||
val child = context.spawnAnonymous(typed2)
|
||||
child ! Ping(self)
|
||||
|
|
|
|||
|
|
@ -185,7 +185,7 @@ object Behavior {
|
|||
(behavior._tag: @switch) match {
|
||||
case BehaviorTags.SameBehavior => current
|
||||
case BehaviorTags.UnhandledBehavior => current
|
||||
case BehaviorTags.DeferredBehavior =>
|
||||
case BehaviorTags.DeferredBehavior =>
|
||||
val deferred = behavior.asInstanceOf[DeferredBehavior[T]]
|
||||
canonicalize(deferred(ctx), deferred, ctx)
|
||||
case _ => behavior
|
||||
|
|
@ -219,7 +219,7 @@ object Behavior {
|
|||
@tailrec
|
||||
def loop(b: Behavior[T]): Boolean =
|
||||
b match {
|
||||
case _ if p(b) => true
|
||||
case _ if p(b) => true
|
||||
case wrappingBehavior: InterceptorImpl[T, T] @unchecked =>
|
||||
loop(wrappingBehavior.nestedBehavior)
|
||||
case d: DeferredBehavior[T] =>
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ object ProducerController {
|
|||
def apply(config: Config): Settings = {
|
||||
val chunkLargeMessagesBytes = toRootLowerCase(config.getString("chunk-large-messages")) match {
|
||||
case "off" => 0
|
||||
case _ =>
|
||||
case _ =>
|
||||
config.getBytes("chunk-large-messages").requiring(_ <= Int.MaxValue, "Too large chunk-large-messages.").toInt
|
||||
}
|
||||
new Settings(
|
||||
|
|
|
|||
|
|
@ -658,7 +658,7 @@ private class ConsumerControllerImpl[A] private (
|
|||
if (retryTimer.interval() != retryTimer.minBackoff)
|
||||
context.log.debug("Schedule next retry in [{} ms]", retryTimer.interval().toMillis)
|
||||
s.registering match {
|
||||
case None => nextBehavior()
|
||||
case None => nextBehavior()
|
||||
case Some(reg) =>
|
||||
reg ! ProducerController.RegisterConsumer(context.self)
|
||||
Behaviors.same
|
||||
|
|
|
|||
|
|
@ -274,13 +274,13 @@ object ProducerControllerImpl {
|
|||
case RegisterConsumer(c: ActorRef[ConsumerController.Command[A]] @unchecked) =>
|
||||
(producer, initialState) match {
|
||||
case (Some(p), Some(s)) => thenBecomeActive(p, c, s)
|
||||
case (_, _) =>
|
||||
case (_, _) =>
|
||||
waitingForInitialization(context, producer, Some(c), durableQueue, settings, initialState)(thenBecomeActive)
|
||||
}
|
||||
case start: Start[A] @unchecked =>
|
||||
(consumerController, initialState) match {
|
||||
case (Some(c), Some(s)) => thenBecomeActive(start.producer, c, s)
|
||||
case (_, _) =>
|
||||
case (_, _) =>
|
||||
waitingForInitialization(
|
||||
context,
|
||||
Some(start.producer),
|
||||
|
|
@ -292,7 +292,7 @@ object ProducerControllerImpl {
|
|||
case load: LoadStateReply[A] @unchecked =>
|
||||
(producer, consumerController) match {
|
||||
case (Some(p), Some(c)) => thenBecomeActive(p, c, load.state)
|
||||
case (_, _) =>
|
||||
case (_, _) =>
|
||||
waitingForInitialization(context, producer, consumerController, durableQueue, settings, Some(load.state))(
|
||||
thenBecomeActive)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -328,7 +328,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
} else {
|
||||
selectWorker() match {
|
||||
case Some(w) => Right(w)
|
||||
case None =>
|
||||
case None =>
|
||||
checkStashFull(stashBuffer)
|
||||
context.log.debug("Stashing message, seqNr [{}]", totalSeqNr)
|
||||
stashBuffer.stash(Msg(msg, wasStashed = true, replyTo))
|
||||
|
|
@ -507,7 +507,7 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
|
|||
if (traceEnabled)
|
||||
context.log.trace("Received Ack seqNr [{}] from worker [{}].", confirmedSeqNr, outState.confirmationQualifier)
|
||||
confirmed.foreach {
|
||||
case Unconfirmed(_, _, _, None) => // no reply
|
||||
case Unconfirmed(_, _, _, None) => // no reply
|
||||
case Unconfirmed(_, _, _, Some(replyTo)) =>
|
||||
replyTo ! Done
|
||||
}
|
||||
|
|
|
|||
|
|
@ -114,7 +114,7 @@ import scala.util.Success
|
|||
// context-shared timer needed to allow for nested timer usage
|
||||
def timer: TimerSchedulerCrossDslSupport[T] = _timer match {
|
||||
case OptionVal.Some(timer) => timer
|
||||
case _ =>
|
||||
case _ =>
|
||||
checkCurrentActorThread()
|
||||
val timer = mkTimer()
|
||||
_timer = OptionVal.Some(timer)
|
||||
|
|
@ -161,7 +161,7 @@ import scala.util.Success
|
|||
// lazy init of logging setup
|
||||
_logging match {
|
||||
case OptionVal.Some(l) => l
|
||||
case _ =>
|
||||
case _ =>
|
||||
val logClass = LoggerClass.detectLoggerClassFromStack(classOf[Behavior[_]])
|
||||
val logger = LoggerFactory.getLogger(logClass.getName)
|
||||
val l = LoggingContext(logger, classicActorContext.props.deploy.tags, this)
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ private[pekko] trait ExtensionsImpl extends Extensions { self: ActorSystem[_] wi
|
|||
|
||||
idTry match {
|
||||
case Success(id: ExtensionId[_]) => registerExtension(id)
|
||||
case Success(_) =>
|
||||
case Success(_) =>
|
||||
if (!throwOnLoadFail) log.error("[{}] is not an 'ExtensionId', skipping...", extensionIdFQCN)
|
||||
else throw new RuntimeException(s"[$extensionIdFQCN] is not an 'ExtensionId'")
|
||||
case Failure(problem) =>
|
||||
|
|
@ -100,7 +100,7 @@ private[pekko] trait ExtensionsImpl extends Extensions { self: ActorSystem[_] wi
|
|||
}
|
||||
.getOrElse(ext.createExtension(self))
|
||||
instance match {
|
||||
case null => throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]")
|
||||
case null => throw new IllegalStateException(s"Extension instance created as 'null' for extension [$ext]")
|
||||
case nonNull =>
|
||||
val instance = nonNull.asInstanceOf[T]
|
||||
// Replace our in process signal with the initialized extension
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ private[pekko] object LoggerClass {
|
|||
try {
|
||||
def skip(name: String): Boolean = {
|
||||
def loop(skipList: List[String]): Boolean = skipList match {
|
||||
case Nil => false
|
||||
case Nil => false
|
||||
case head :: tail =>
|
||||
if (name.startsWith(head)) true
|
||||
else loop(tail)
|
||||
|
|
|
|||
|
|
@ -35,19 +35,19 @@ import pekko.serialization.{ BaseSerializer, SerializerWithStringManifest }
|
|||
|
||||
def manifest(o: AnyRef): String = o match {
|
||||
case _: ActorRef[_] => ActorRefManifest
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
case ref: ActorRef[_] => resolver.toSerializationFormat(ref).getBytes(StandardCharsets.UTF_8)
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
|
||||
}
|
||||
|
||||
def fromBinary(bytes: Array[Byte], manifest: String): ActorRef[Any] = manifest match {
|
||||
case ActorRefManifest => resolver.resolveActorRef(new String(bytes, StandardCharsets.UTF_8))
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new NotSerializableException(
|
||||
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -28,7 +28,7 @@ private[pekko] object ActorContextAdapter {
|
|||
private def toClassicImp[U](context: TypedActorContext[_]): classic.ActorContext =
|
||||
context match {
|
||||
case adapter: ActorContextAdapter[_] => adapter.classicContext
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new UnsupportedOperationException(
|
||||
"Only adapted classic ActorContext permissible " +
|
||||
s"($context of class ${context.getClass.getName})")
|
||||
|
|
|
|||
|
|
@ -61,7 +61,7 @@ private[pekko] object ActorRefAdapter {
|
|||
ref match {
|
||||
case adapter: ActorRefAdapter[_] => adapter.classicRef
|
||||
case adapter: ActorSystemAdapter[_] => adapter.system.guardian
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new UnsupportedOperationException(
|
||||
"Only adapted classic ActorRefs permissible " +
|
||||
s"($ref of class ${ref.getClass.getName})")
|
||||
|
|
@ -69,8 +69,8 @@ private[pekko] object ActorRefAdapter {
|
|||
|
||||
def sendSystemMessage(classicRef: pekko.actor.InternalActorRef, signal: internal.SystemMessage): Unit =
|
||||
signal match {
|
||||
case internal.Create() => throw new IllegalStateException("WAT? No, seriously.")
|
||||
case internal.Terminate() => classicRef.stop()
|
||||
case internal.Create() => throw new IllegalStateException("WAT? No, seriously.")
|
||||
case internal.Terminate() => classicRef.stop()
|
||||
case internal.Watch(watchee, watcher) =>
|
||||
classicRef.sendSystemMessage(sysmsg.Watch(toClassic(watchee), toClassic(watcher)))
|
||||
case internal.Unwatch(watchee, watcher) =>
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ import pekko.annotation.InternalApi
|
|||
def toClassic(scheduler: Scheduler): pekko.actor.Scheduler =
|
||||
scheduler match {
|
||||
case s: SchedulerAdapter => s.classicScheduler
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new UnsupportedOperationException(
|
||||
"unknown Scheduler type " +
|
||||
s"($scheduler of class ${scheduler.getClass.getName})")
|
||||
|
|
|
|||
|
|
@ -29,13 +29,13 @@ final class ServiceKeySerializer(val system: pekko.actor.ExtendedActorSystem)
|
|||
with BaseSerializer {
|
||||
def manifest(o: AnyRef): String = o match {
|
||||
case key: DefaultServiceKey[_] => key.typeName
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
|
||||
}
|
||||
|
||||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
case serviceKey: DefaultServiceKey[_] => serviceKey.id.getBytes(StandardCharsets.UTF_8)
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ abstract class AbstractBehavior[T](context: ActorContext[T]) extends ExtensibleB
|
|||
private var _receive: OptionVal[Receive[T]] = OptionVal.None
|
||||
private def receive: Receive[T] = _receive match {
|
||||
case OptionVal.Some(r) => r
|
||||
case _ =>
|
||||
case _ =>
|
||||
val receive = createReceive
|
||||
_receive = OptionVal.Some(receive)
|
||||
receive
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ object AskPattern {
|
|||
// because it might be needed when we move to a 'native' typed runtime, see #24219
|
||||
ref match {
|
||||
case a: InternalRecipientRef[Req] => askClassic[Req, Res](a, timeout, replyTo)
|
||||
case a =>
|
||||
case a =>
|
||||
throw new IllegalStateException(
|
||||
"Only expect references to be RecipientRef, ActorRefAdapter or ActorSystemAdapter until " +
|
||||
"native system is implemented: " + a.getClass)
|
||||
|
|
|
|||
|
|
@ -425,7 +425,7 @@ object ByteString {
|
|||
else
|
||||
that match {
|
||||
case b: ByteString1C => ByteStrings(this, b.toByteString1)
|
||||
case b: ByteString1 =>
|
||||
case b: ByteString1 =>
|
||||
if ((bytes eq b.bytes) && (startIndex + length == b.startIndex))
|
||||
new ByteString1(bytes, startIndex, length + b.length)
|
||||
else ByteStrings(this, b)
|
||||
|
|
@ -1211,7 +1211,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
|
|||
}
|
||||
override def ++=(xs: TraversableOnce[Byte]): this.type = {
|
||||
xs match {
|
||||
case bs: ByteString => ++=(bs)
|
||||
case bs: ByteString => ++=(bs)
|
||||
case xs: WrappedArray.ofByte =>
|
||||
if (xs.nonEmpty) putByteArrayUnsafe(xs.array.clone)
|
||||
case seq: collection.IndexedSeq[Byte] if shouldResizeTempFor(seq.length) =>
|
||||
|
|
|
|||
|
|
@ -426,7 +426,7 @@ object ByteString {
|
|||
else
|
||||
that match {
|
||||
case b: ByteString1C => ByteStrings(this, b.toByteString1)
|
||||
case b: ByteString1 =>
|
||||
case b: ByteString1 =>
|
||||
if ((bytes eq b.bytes) && (startIndex + length == b.startIndex))
|
||||
new ByteString1(bytes, startIndex, length + b.length)
|
||||
else ByteStrings(this, b)
|
||||
|
|
@ -1259,7 +1259,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
|
|||
|
||||
override def addAll(xs: IterableOnce[Byte]): this.type = {
|
||||
xs match {
|
||||
case bs: ByteString => addAll(bs)
|
||||
case bs: ByteString => addAll(bs)
|
||||
case xs: WrappedArray.ofByte =>
|
||||
if (xs.nonEmpty) putByteArrayUnsafe(xs.array.clone)
|
||||
case seq: collection.IndexedSeq[Byte] if shouldResizeTempFor(seq.length) =>
|
||||
|
|
|
|||
|
|
@ -427,7 +427,7 @@ object ByteString {
|
|||
else
|
||||
that match {
|
||||
case b: ByteString1C => ByteStrings(this, b.toByteString1)
|
||||
case b: ByteString1 =>
|
||||
case b: ByteString1 =>
|
||||
if ((bytes eq b.bytes) && (startIndex + length == b.startIndex))
|
||||
new ByteString1(bytes, startIndex, length + b.length)
|
||||
else ByteStrings(this, b)
|
||||
|
|
@ -1262,7 +1262,7 @@ final class ByteStringBuilder extends Builder[Byte, ByteString] {
|
|||
|
||||
override def addAll(xs: IterableOnce[Byte]): this.type = {
|
||||
xs match {
|
||||
case bs: ByteString => addAll(bs)
|
||||
case bs: ByteString => addAll(bs)
|
||||
case xs: WrappedArray.ofByte =>
|
||||
if (xs.nonEmpty) putByteArrayUnsafe(xs.array.clone)
|
||||
case seq: collection.IndexedSeq[Byte] if shouldResizeTempFor(seq.length) =>
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ private[pekko] trait AbstractProps {
|
|||
val actorClass = Reflect.findMarker(cc, coc) match {
|
||||
case t: ParameterizedType =>
|
||||
t.getActualTypeArguments.head match {
|
||||
case c: Class[_] => c // since T <: Actor
|
||||
case c: Class[_] => c // since T <: Actor
|
||||
case v: TypeVariable[_] =>
|
||||
v.getBounds.collectFirst { case c: Class[_] if ac.isAssignableFrom(c) && c != ac => c }.getOrElse(ac)
|
||||
case x => throw new IllegalArgumentException(s"unsupported type found in Creator argument [$x]")
|
||||
|
|
|
|||
|
|
@ -576,7 +576,7 @@ private[pekko] class ActorCell(
|
|||
case PoisonPill => self.stop()
|
||||
case sel: ActorSelectionMessage => receiveSelection(sel)
|
||||
case Identify(messageId) => sender() ! ActorIdentity(messageId, Some(self))
|
||||
case unexpected =>
|
||||
case unexpected =>
|
||||
throw new RuntimeException(s"Unexpected message for autoreceive: $unexpected") // for exhaustiveness check, will not happen
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -137,7 +137,7 @@ object ActorPath {
|
|||
def validate(pos: Int): Int =
|
||||
if (pos < len)
|
||||
s.charAt(pos) match {
|
||||
case c if isValidChar(c) => validate(pos + 1)
|
||||
case c if isValidChar(c) => validate(pos + 1)
|
||||
case '%' if pos + 2 < len && isHexChar(s.charAt(pos + 1)) && isHexChar(s.charAt(pos + 2)) =>
|
||||
validate(pos + 3)
|
||||
case _ => pos
|
||||
|
|
|
|||
|
|
@ -682,7 +682,7 @@ private[pekko] class EmptyLocalActorRef(
|
|||
}
|
||||
|
||||
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = message match {
|
||||
case null => throw InvalidMessageException("Message is null")
|
||||
case null => throw InvalidMessageException("Message is null")
|
||||
case d: DeadLetter =>
|
||||
specialHandle(d.message, d.sender) // do NOT form endless loops, since deadLetters will resend!
|
||||
case _ if !specialHandle(message, sender) =>
|
||||
|
|
@ -696,7 +696,7 @@ private[pekko] class EmptyLocalActorRef(
|
|||
w.watcher.sendSystemMessage(
|
||||
DeathWatchNotification(w.watchee, existenceConfirmed = false, addressTerminated = false))
|
||||
true
|
||||
case _: Unwatch => true // Just ignore
|
||||
case _: Unwatch => true // Just ignore
|
||||
case Identify(messageId) =>
|
||||
sender ! ActorIdentity(messageId, None)
|
||||
true
|
||||
|
|
@ -707,7 +707,7 @@ private[pekko] class EmptyLocalActorRef(
|
|||
case None =>
|
||||
sel.msg match {
|
||||
case m: DeadLetterSuppression => publishSupressedDeadLetter(m, sender)
|
||||
case _ =>
|
||||
case _ =>
|
||||
eventStream.publish(
|
||||
DeadLetter(sel.msg, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
||||
}
|
||||
|
|
@ -737,7 +737,7 @@ private[pekko] class DeadLetterActorRef(_provider: ActorRefProvider, _path: Acto
|
|||
case null => throw InvalidMessageException("Message is null")
|
||||
case Identify(messageId) => sender ! ActorIdentity(messageId, None)
|
||||
case d: DeadLetter => if (!specialHandle(d.message, d.sender)) eventStream.publish(d)
|
||||
case _ =>
|
||||
case _ =>
|
||||
if (!specialHandle(message, sender))
|
||||
eventStream.publish(DeadLetter(message, if (sender eq Actor.noSender) provider.deadLetters else sender, this))
|
||||
}
|
||||
|
|
@ -902,8 +902,8 @@ private[pekko] class VirtualPathContainer(
|
|||
|
||||
override def sendSystemMessage(message: SystemMessage): Unit = {
|
||||
message match {
|
||||
case w: Watch => addWatcher(w.watchee, w.watcher)
|
||||
case u: Unwatch => remWatcher(u.watchee, u.watcher)
|
||||
case w: Watch => addWatcher(w.watchee, w.watcher)
|
||||
case u: Unwatch => remWatcher(u.watchee, u.watcher)
|
||||
case DeathWatchNotification(actorRef, _, _) =>
|
||||
this.!(Terminated(actorRef)(existenceConfirmed = true, addressTerminated = false))(actorRef)
|
||||
case _ => // ignore all other messages
|
||||
|
|
|
|||
|
|
@ -347,7 +347,7 @@ private[pekko] object LocalActorRefProvider {
|
|||
// a registered, and watched termination hook terminated before
|
||||
// termination process of guardian has started
|
||||
terminationHooks -= a
|
||||
case StopChild(child) => context.stop(child)
|
||||
case StopChild(child) => context.stop(child)
|
||||
case RegisterTerminationHook if sender() != context.system.deadLetters =>
|
||||
terminationHooks += sender()
|
||||
context.watch(sender())
|
||||
|
|
@ -561,7 +561,7 @@ private[pekko] class LocalActorRefProvider private[pekko] (
|
|||
// make user provided guardians not run on internal dispatcher
|
||||
val dispatcher =
|
||||
system.guardianProps match {
|
||||
case None => internalDispatcher
|
||||
case None => internalDispatcher
|
||||
case Some(props) =>
|
||||
val dispatcherId =
|
||||
if (props.deploy.dispatcher == Deploy.DispatcherSameAsParent) Dispatchers.DefaultDispatcherId
|
||||
|
|
@ -621,7 +621,7 @@ private[pekko] class LocalActorRefProvider private[pekko] (
|
|||
|
||||
def resolveActorRef(path: String): ActorRef = path match {
|
||||
case ActorPathExtractor(address, elems) if address == rootPath.address => resolveActorRef(rootGuardian, elems)
|
||||
case _ =>
|
||||
case _ =>
|
||||
logDeser.debug("Resolve (deserialization) of unknown (invalid) path [{}], using deadLetters.", path)
|
||||
deadLetters
|
||||
}
|
||||
|
|
@ -766,7 +766,7 @@ private[pekko] class LocalActorRefProvider private[pekko] (
|
|||
Serialization.Information(getDefaultAddress, system)
|
||||
serializationInformationCache match {
|
||||
case OptionVal.Some(info) => info
|
||||
case _ =>
|
||||
case _ =>
|
||||
if (system eq null)
|
||||
throw new IllegalStateException("Too early access of serializationInformation")
|
||||
else {
|
||||
|
|
@ -783,7 +783,7 @@ private[pekko] class LocalActorRefProvider private[pekko] (
|
|||
override private[pekko] def addressString: String = {
|
||||
_addressString match {
|
||||
case OptionVal.Some(addr) => addr
|
||||
case _ =>
|
||||
case _ =>
|
||||
val addr = getDefaultAddress.toString
|
||||
_addressString = OptionVal.Some(addr)
|
||||
addr
|
||||
|
|
|
|||
|
|
@ -1160,7 +1160,7 @@ private[pekko] class ActorSystemImpl(
|
|||
}
|
||||
findExtension(ext) // Registration in process, await completion and retry
|
||||
case t: Throwable => throw t // Initialization failed, throw same again
|
||||
case other =>
|
||||
case other =>
|
||||
other.asInstanceOf[T] // could be a T or null, in which case we return the null as T
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -726,7 +726,7 @@ final class CoordinatedShutdown private[pekko] (
|
|||
log.info("Running CoordinatedShutdown with reason [{}]", reason)
|
||||
def loop(remainingPhases: List[String]): Future[Done] = {
|
||||
remainingPhases match {
|
||||
case Nil => Future.successful(Done)
|
||||
case Nil => Future.successful(Done)
|
||||
case phaseName :: remaining if !phases(phaseName).enabled =>
|
||||
tasks.get(phaseName).foreach { phaseDef =>
|
||||
log.info(s"Phase [{}] disabled through configuration, skipping [{}] tasks.", phaseName, phaseDef.size)
|
||||
|
|
@ -810,7 +810,7 @@ final class CoordinatedShutdown private[pekko] (
|
|||
def timeout(phase: String): FiniteDuration =
|
||||
phases.get(phase) match {
|
||||
case Some(p) => p.timeout
|
||||
case None =>
|
||||
case None =>
|
||||
throw new IllegalArgumentException(s"Unknown phase [$phase]. All phases must be defined in configuration")
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -307,7 +307,7 @@ private[pekko] class Deployer(val settings: ActorSystem.Settings, val dynamicAcc
|
|||
.createInstanceFor[RouterConfig](fqn, args1)
|
||||
.recover {
|
||||
case e @ (_: IllegalArgumentException | _: ConfigException) => throw e
|
||||
case e: NoSuchMethodException =>
|
||||
case e: NoSuchMethodException =>
|
||||
dynamicAccess
|
||||
.createInstanceFor[RouterConfig](fqn, args2)
|
||||
.recover {
|
||||
|
|
|
|||
|
|
@ -869,7 +869,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
|
|||
private[pekko] def applyState(nextState: State): Unit = {
|
||||
nextState.stopReason match {
|
||||
case None => makeTransition(nextState)
|
||||
case _ =>
|
||||
case _ =>
|
||||
nextState.replies.reverse.foreach { r =>
|
||||
sender() ! r
|
||||
}
|
||||
|
|
@ -901,7 +901,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
|
|||
currentState.timeout match {
|
||||
case SomeMaxFiniteDuration => // effectively disable stateTimeout
|
||||
case Some(d: FiniteDuration) if d.length >= 0 => timeoutFuture = scheduleTimeout(d)
|
||||
case _ =>
|
||||
case _ =>
|
||||
val timeout = stateTimeouts(currentState.stateName)
|
||||
if (timeout.isDefined) timeoutFuture = scheduleTimeout(timeout.get)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -412,7 +412,7 @@ abstract class SupervisorStrategy {
|
|||
}
|
||||
decision match {
|
||||
case Escalate => // don't log here
|
||||
case d =>
|
||||
case d =>
|
||||
if (d.logLevel == Logging.ErrorLevel)
|
||||
publish(context, Error(cause, child.path.toString, getClass, logMessage))
|
||||
else
|
||||
|
|
|
|||
|
|
@ -268,7 +268,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
|
|||
case null => ()
|
||||
case node =>
|
||||
node.value.ticks match {
|
||||
case 0 => node.value.executeTask()
|
||||
case 0 => node.value.executeTask()
|
||||
case ticks =>
|
||||
val futureTick = ((
|
||||
time - start + // calculate the nanos since timer start
|
||||
|
|
@ -344,7 +344,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
|
|||
}
|
||||
stopped.get match {
|
||||
case null => nextTick()
|
||||
case p =>
|
||||
case p =>
|
||||
assert(stopped.compareAndSet(p, Promise.successful(Nil)), "Stop signal violated in LARS")
|
||||
p.success(clearAll())
|
||||
}
|
||||
|
|
@ -376,13 +376,13 @@ object LightArrayRevolverScheduler {
|
|||
private final def extractTask(replaceWith: Runnable): Runnable =
|
||||
task match {
|
||||
case t @ (ExecutedTask | CancelledTask) => t
|
||||
case x => if (unsafe.compareAndSwapObject(this, taskOffset, x, replaceWith): @nowarn("cat=deprecation")) x
|
||||
case x => if (unsafe.compareAndSwapObject(this, taskOffset, x, replaceWith): @nowarn("cat=deprecation")) x
|
||||
else extractTask(replaceWith)
|
||||
}
|
||||
|
||||
private[pekko] final def executeTask(): Boolean = extractTask(ExecutedTask) match {
|
||||
case ExecutedTask | CancelledTask => false
|
||||
case other =>
|
||||
case other =>
|
||||
try {
|
||||
executionContext.execute(other)
|
||||
true
|
||||
|
|
@ -397,7 +397,7 @@ object LightArrayRevolverScheduler {
|
|||
|
||||
override def cancel(): Boolean = extractTask(CancelledTask) match {
|
||||
case ExecutedTask | CancelledTask => false
|
||||
case task: SchedulerTask =>
|
||||
case task: SchedulerTask =>
|
||||
notifyCancellation(task)
|
||||
true
|
||||
case _ => true
|
||||
|
|
|
|||
|
|
@ -79,7 +79,7 @@ class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAcces
|
|||
case null => throw new NullPointerException
|
||||
case x if !t.isInstance(x) => throw new ClassCastException(fqcn + " is not a subtype of " + t)
|
||||
case x: T => x
|
||||
case unexpected =>
|
||||
case unexpected =>
|
||||
throw new IllegalArgumentException(s"Unexpected module field: $unexpected") // will not happen, for exhaustiveness check
|
||||
}
|
||||
}.recover { case i: InvocationTargetException if i.getTargetException ne null => throw i.getTargetException }
|
||||
|
|
|
|||
|
|
@ -169,8 +169,8 @@ private[pekko] class RepointableActorRef(
|
|||
def getChild(name: Iterator[String]): InternalActorRef =
|
||||
if (name.hasNext) {
|
||||
name.next() match {
|
||||
case ".." => getParent.getChild(name)
|
||||
case "" => getChild(name)
|
||||
case ".." => getParent.getChild(name)
|
||||
case "" => getChild(name)
|
||||
case other =>
|
||||
val (childName, uid) = ActorCell.splitNameAndUid(other)
|
||||
lookup.getChildByName(childName) match {
|
||||
|
|
|
|||
|
|
@ -585,7 +585,7 @@ object Scheduler {
|
|||
@tailrec final protected def swap(c: Cancellable): Unit = {
|
||||
get match {
|
||||
case null => if (c != null) c.cancel()
|
||||
case old =>
|
||||
case old =>
|
||||
if (!compareAndSet(old, c))
|
||||
swap(c)
|
||||
}
|
||||
|
|
@ -595,7 +595,7 @@ object Scheduler {
|
|||
@tailrec def tailrecCancel(): Boolean = {
|
||||
get match {
|
||||
case null => false
|
||||
case c =>
|
||||
case c =>
|
||||
if (c.cancel()) compareAndSet(c, null)
|
||||
else compareAndSet(c, null) || tailrecCancel()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -160,7 +160,7 @@ private[pekko] trait StashSupport {
|
|||
private[pekko] val mailbox: DequeBasedMessageQueueSemantics = {
|
||||
actorCell.mailbox.messageQueue match {
|
||||
case queue: DequeBasedMessageQueueSemantics => queue
|
||||
case other =>
|
||||
case other =>
|
||||
throw ActorInitializationException(
|
||||
self,
|
||||
s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" +
|
||||
|
|
|
|||
|
|
@ -167,7 +167,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
} catch { case i: InvocationTargetException => throw i.getTargetException }
|
||||
|
||||
@throws(classOf[ObjectStreamException]) private def writeReplace(): AnyRef = parameters match {
|
||||
case null => SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null)
|
||||
case null => SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, null)
|
||||
case ps if ps.length == 0 =>
|
||||
SerializedMethodCall(method.getDeclaringClass, method.getName, method.getParameterTypes, Array())
|
||||
case ps =>
|
||||
|
|
@ -209,7 +209,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
serializedParameters match {
|
||||
case null => null
|
||||
case a if a.length == 0 => Array[AnyRef]()
|
||||
case a =>
|
||||
case a =>
|
||||
val deserializedParameters: Array[AnyRef] = new Array[AnyRef](a.length) // Mutable for the sake of sanity
|
||||
for (i <- 0 until a.length) {
|
||||
val (sId, manifest, bytes) = a(i)
|
||||
|
|
@ -318,7 +318,7 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
override def preRestart(reason: Throwable, message: Option[Any]): Unit = withContext {
|
||||
me match {
|
||||
case l: PreRestart => l.preRestart(reason, message)
|
||||
case _ =>
|
||||
case _ =>
|
||||
self.context.children
|
||||
.foreach(self.context.stop) // Can't be super.preRestart(reason, message) since that would invoke postStop which would set the actorVar to DL and proxyVar to null
|
||||
}
|
||||
|
|
@ -465,11 +465,11 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
|
|||
@throws(classOf[Throwable])
|
||||
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
|
||||
case "toString" => actor.toString
|
||||
case "equals" =>
|
||||
case "equals" =>
|
||||
(args.length == 1 && (proxy eq args(0)) || actor == extension.getActorRefFor(args(0)))
|
||||
.asInstanceOf[AnyRef] // Force boxing of the boolean
|
||||
case "hashCode" => actor.hashCode.asInstanceOf[AnyRef]
|
||||
case _ =>
|
||||
case _ =>
|
||||
implicit val dispatcher = extension.system.dispatcher
|
||||
import pekko.pattern.ask
|
||||
MethodCall(method, args) match {
|
||||
|
|
@ -761,7 +761,7 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac
|
|||
private[pekko] def invocationHandlerFor(typedActor: AnyRef): TypedActorInvocationHandler =
|
||||
if ((typedActor ne null) && classOf[Proxy].isAssignableFrom(typedActor.getClass) && Proxy.isProxyClass(
|
||||
typedActor.getClass)) typedActor match {
|
||||
case null => null
|
||||
case null => null
|
||||
case other =>
|
||||
Proxy.getInvocationHandler(other) match {
|
||||
case null => null
|
||||
|
|
|
|||
|
|
@ -172,7 +172,7 @@ private[pekko] trait Children { this: ActorCell =>
|
|||
val cc = childrenRefs
|
||||
cc.getByName(ref.path.name) match {
|
||||
case old @ Some(_: ChildRestartStats) => old.asInstanceOf[Option[ChildRestartStats]]
|
||||
case Some(ChildNameReserved) =>
|
||||
case Some(ChildNameReserved) =>
|
||||
val crs = ChildRestartStats(ref)
|
||||
val name = ref.path.name
|
||||
if (swapChildrenRefs(cc, cc.add(name, crs))) Some(crs) else initChild(ref)
|
||||
|
|
@ -270,7 +270,7 @@ private[pekko] trait Children { this: ActorCell =>
|
|||
name match {
|
||||
case null => throw InvalidActorNameException("actor name must not be null")
|
||||
case "" => throw InvalidActorNameException("actor name must not be empty")
|
||||
case _ =>
|
||||
case _ =>
|
||||
ActorPath.validatePathElement(name)
|
||||
name
|
||||
}
|
||||
|
|
|
|||
|
|
@ -204,7 +204,7 @@ private[pekko] object ChildrenContainer {
|
|||
|
||||
override def reserve(name: String): ChildrenContainer = reason match {
|
||||
case Termination => throw new IllegalStateException("cannot reserve actor name '" + name + "': terminating")
|
||||
case _ =>
|
||||
case _ =>
|
||||
if (c contains name)
|
||||
throw InvalidActorNameException(s"actor name [$name] is not unique!")
|
||||
else copy(c = c.updated(name, ChildNameReserved))
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ private[pekko] trait DeathWatch { this: ActorCell =>
|
|||
existenceConfirmed: Boolean,
|
||||
addressTerminated: Boolean): Unit = {
|
||||
watching.get(actor) match {
|
||||
case None => // We're apparently no longer watching this actor.
|
||||
case None => // We're apparently no longer watching this actor.
|
||||
case Some(optionalMessage) =>
|
||||
maintainAddressTerminatedSubscription(actor) {
|
||||
watching -= actor
|
||||
|
|
|
|||
|
|
@ -187,7 +187,7 @@ private[pekko] trait Dispatch { this: ActorCell =>
|
|||
|
||||
unwrappedMessage match {
|
||||
case _: NoSerializationVerificationNeeded => envelope
|
||||
case msg =>
|
||||
case msg =>
|
||||
if (system.settings.NoSerializationVerificationNeededClassPrefix.exists(msg.getClass.getName.startsWith))
|
||||
envelope
|
||||
else {
|
||||
|
|
|
|||
|
|
@ -198,7 +198,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
|
|||
if (updateShutdownSchedule(SCHEDULED, RESCHEDULED)) ()
|
||||
else ifSensibleToDoSoThenScheduleShutdown()
|
||||
case RESCHEDULED =>
|
||||
case unexpected =>
|
||||
case unexpected =>
|
||||
throw new IllegalArgumentException(s"Unexpected actor class marker: $unexpected") // will not happen, for exhaustiveness check
|
||||
}
|
||||
}
|
||||
|
|
@ -259,7 +259,7 @@ abstract class MessageDispatcher(val configurator: MessageDispatcherConfigurator
|
|||
if (updateShutdownSchedule(RESCHEDULED, SCHEDULED)) scheduleShutdownAction()
|
||||
else run()
|
||||
case UNSCHEDULED =>
|
||||
case unexpected =>
|
||||
case unexpected =>
|
||||
throw new IllegalArgumentException(s"Unexpected actor class marker: $unexpected") // will not happen, for exhaustiveness check
|
||||
}
|
||||
}
|
||||
|
|
@ -482,7 +482,7 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
|
|||
.map {
|
||||
case "array" => ThreadPoolConfig.arrayBlockingQueue(size, false) // TODO config fairness?
|
||||
case "" | "linked" => ThreadPoolConfig.linkedBlockingQueue(size)
|
||||
case x =>
|
||||
case x =>
|
||||
throw new IllegalArgumentException("[%s] is not a valid task-queue-type [array|linked]!".format(x))
|
||||
}
|
||||
.map { qf => (q: ThreadPoolConfigBuilder) =>
|
||||
|
|
|
|||
|
|
@ -63,7 +63,7 @@ private[pekko] class CachingConfig(_config: Config) extends Config {
|
|||
val ne = Try { config.hasPath(path) } match {
|
||||
case Failure(_) => invalidPathEntry
|
||||
case Success(false) => nonExistingPathEntry
|
||||
case _ =>
|
||||
case _ =>
|
||||
Try { config.getValue(path) } match {
|
||||
case Failure(_) =>
|
||||
emptyPathEntry
|
||||
|
|
|
|||
|
|
@ -272,7 +272,7 @@ class Dispatchers @InternalApi private[pekko] (
|
|||
"During a migration period you can still use BalancingDispatcher by specifying the full class name: " +
|
||||
classOf[BalancingDispatcherConfigurator].getName)
|
||||
case "PinnedDispatcher" => new PinnedDispatcherConfigurator(cfg, prerequisites)
|
||||
case fqn =>
|
||||
case fqn =>
|
||||
val args = List(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites)
|
||||
prerequisites.dynamicAccess
|
||||
.createInstanceFor[MessageDispatcherConfigurator](fqn, args)
|
||||
|
|
@ -383,7 +383,7 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
|
|||
|
||||
private val threadPoolConfig: ThreadPoolConfig = configureExecutor() match {
|
||||
case e: ThreadPoolExecutorConfigurator => e.threadPoolConfig
|
||||
case _ =>
|
||||
case _ =>
|
||||
prerequisites.eventStream.publish(
|
||||
Warning(
|
||||
"PinnedDispatcherConfigurator",
|
||||
|
|
|
|||
|
|
@ -81,7 +81,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
|
|||
|
||||
def validate(t: ThreadFactory): ForkJoinPool.ForkJoinWorkerThreadFactory = t match {
|
||||
case correct: ForkJoinPool.ForkJoinWorkerThreadFactory => correct
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalStateException(
|
||||
"The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!")
|
||||
}
|
||||
|
|
@ -188,7 +188,7 @@ class ForkJoinExecutorConfigurator(config: Config, prerequisites: DispatcherPrer
|
|||
val asyncMode = config.getString("task-peeking-mode") match {
|
||||
case "FIFO" => true
|
||||
case "LIFO" => false
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
"Cannot instantiate ForkJoinExecutorServiceFactory. " +
|
||||
""""task-peeking-mode" in "fork-join-executor" section could only set to "FIFO" or "LIFO".""")
|
||||
|
|
|
|||
|
|
@ -316,7 +316,7 @@ private[pekko] abstract class Mailbox(val messageQueue: MessageQueue)
|
|||
try dlm.systemEnqueue(actor.self, msg)
|
||||
catch {
|
||||
case e: InterruptedException => interruption = e
|
||||
case NonFatal(e) =>
|
||||
case NonFatal(e) =>
|
||||
actor.system.eventStream.publish(
|
||||
Error(
|
||||
e,
|
||||
|
|
|
|||
|
|
@ -119,7 +119,7 @@ private[pekko] class Mailboxes(
|
|||
case t: ParameterizedType =>
|
||||
t.getActualTypeArguments.head match {
|
||||
case c: Class[_] => c
|
||||
case x =>
|
||||
case x =>
|
||||
throw new IllegalArgumentException(s"no wildcard type allowed in RequireMessageQueue argument (was [$x])")
|
||||
}
|
||||
case unexpected =>
|
||||
|
|
@ -143,7 +143,7 @@ private[pekko] class Mailboxes(
|
|||
case t: ParameterizedType =>
|
||||
t.getActualTypeArguments.head match {
|
||||
case c: Class[_] => c
|
||||
case x =>
|
||||
case x =>
|
||||
throw new IllegalArgumentException(
|
||||
s"no wildcard type allowed in ProducesMessageQueue argument (was [$x])")
|
||||
}
|
||||
|
|
@ -239,7 +239,7 @@ private[pekko] class Mailboxes(
|
|||
val conf = config(id)
|
||||
|
||||
val mailboxType = conf.getString("mailbox-type") match {
|
||||
case "" => throw new ConfigurationException(s"The setting mailbox-type, defined in [$id] is empty")
|
||||
case "" => throw new ConfigurationException(s"The setting mailbox-type, defined in [$id] is empty")
|
||||
case fqcn =>
|
||||
val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> conf)
|
||||
dynamicAccess
|
||||
|
|
@ -313,7 +313,7 @@ private[pekko] class Mailboxes(
|
|||
val key = dispatcher + "-" + mailbox
|
||||
cache.get(key) match {
|
||||
case Some(value) => value
|
||||
case None =>
|
||||
case None =>
|
||||
val value = stashCapacityFromConfig(dispatcher, mailbox)
|
||||
updateCache(cache, key, value)
|
||||
value
|
||||
|
|
|
|||
|
|
@ -275,7 +275,7 @@ private[pekko] class AffinityPool(
|
|||
@tailrec def runLoop(): Unit =
|
||||
if (!Thread.interrupted()) {
|
||||
(poolState: @switch) match {
|
||||
case Uninitialized => ()
|
||||
case Uninitialized => ()
|
||||
case Initializing | Running =>
|
||||
executeNext()
|
||||
runLoop()
|
||||
|
|
|
|||
|
|
@ -373,7 +373,7 @@ trait ManagedActorClassification { this: ActorEventBus with ActorClassifier =>
|
|||
val current = mappings.get
|
||||
|
||||
current.backing.get(monitored) match {
|
||||
case None => false
|
||||
case None => false
|
||||
case Some(monitors) =>
|
||||
val removed = current.remove(monitored, monitor)
|
||||
val removedMonitors = removed.get(monitored)
|
||||
|
|
|
|||
|
|
@ -1002,7 +1002,7 @@ object Logging {
|
|||
case e: Warning => warning(e)
|
||||
case e: Info => info(e)
|
||||
case e: Debug => debug(e)
|
||||
case e =>
|
||||
case e =>
|
||||
warning(Warning(simpleName(this), this.getClass, "received unexpected event of class " + e.getClass + ": " + e))
|
||||
}
|
||||
|
||||
|
|
@ -1154,7 +1154,7 @@ object Logging {
|
|||
def stackTraceFor(e: Throwable): String = e match {
|
||||
case null | Error.NoCause => ""
|
||||
case _: NoStackTrace => s" (${e.getClass.getName}: ${e.getMessage})"
|
||||
case other =>
|
||||
case other =>
|
||||
val sw = new java.io.StringWriter
|
||||
val pw = new java.io.PrintWriter(sw)
|
||||
pw.append('\n')
|
||||
|
|
|
|||
|
|
@ -38,7 +38,7 @@ class JavaLogger extends Actor with RequiresMessageQueue[LoggerMessageQueueSeman
|
|||
case event: Warning => log(mapLevel(event.level), null, event)
|
||||
case event: Info => log(mapLevel(event.level), null, event)
|
||||
case event: Debug => log(mapLevel(event.level), null, event)
|
||||
case InitializeLogger(_) =>
|
||||
case InitializeLogger(_) =>
|
||||
Logger(this.getClass.getName)
|
||||
.warning(s"${getClass.getName} has been deprecated since Akka 2.6.0. Use SLF4J instead.")
|
||||
sender() ! LoggerInitialized
|
||||
|
|
|
|||
|
|
@ -99,7 +99,7 @@ class InetAddressDnsResolver(cache: SimpleDnsCache, config: Config) extends Acto
|
|||
case "default" => if (positive) defaultCachePolicy else defaultNegativeCachePolicy
|
||||
case "forever" => Forever
|
||||
case "never" => Never
|
||||
case _ => {
|
||||
case _ => {
|
||||
val finiteTtl = config
|
||||
.getDuration(path, TimeUnit.SECONDS)
|
||||
.requiring(_ > 0, s"pekko.io.dns.$path must be 'default', 'forever', 'never' or positive duration")
|
||||
|
|
@ -131,7 +131,7 @@ class InetAddressDnsResolver(cache: SimpleDnsCache, config: Config) extends Acto
|
|||
case r @ DnsProtocol.Resolve(name, ip @ Ip(ipv4, ipv6)) =>
|
||||
val answer = cache.cached(r) match {
|
||||
case Some(a) => a
|
||||
case None =>
|
||||
case None =>
|
||||
log.debug("Request for [{}] was not yet cached", name)
|
||||
try {
|
||||
val addresses: Array[InetAddress] = InetAddress.getAllByName(name)
|
||||
|
|
@ -153,7 +153,7 @@ class InetAddressDnsResolver(cache: SimpleDnsCache, config: Config) extends Acto
|
|||
// no where in pekko now sends this message, but supported until Dns.Resolve/Resolved have been removed
|
||||
val answer: Dns.Resolved = cache.cached(name) match {
|
||||
case Some(a) => a
|
||||
case None =>
|
||||
case None =>
|
||||
try {
|
||||
val addresses = InetAddress.getAllByName(name)
|
||||
// respond with the old protocol as the request was the new protocol
|
||||
|
|
|
|||
|
|
@ -289,14 +289,14 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
|
||||
AllRead
|
||||
case -1 => EndOfStream
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalStateException("Unexpected value returned from read: " + readBytes)
|
||||
}
|
||||
} else MoreDataWaiting
|
||||
|
||||
val buffer = bufferPool.acquire()
|
||||
try innerRead(buffer, ReceivedMessageSizeLimit) match {
|
||||
case AllRead => // nothing to do
|
||||
case AllRead => // nothing to do
|
||||
case MoreDataWaiting =>
|
||||
if (!pullMode) self ! ChannelReadable
|
||||
case EndOfStream if channel.socket.isOutputShutdown =>
|
||||
|
|
@ -438,14 +438,14 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
|
|||
def PendingWrite(commander: ActorRef, write: WriteCommand): PendingWrite = {
|
||||
@tailrec def create(head: WriteCommand, tail: WriteCommand): PendingWrite =
|
||||
head match {
|
||||
case Write.empty => if (tail eq Write.empty) EmptyPendingWrite else create(tail, Write.empty)
|
||||
case Write(data, ack) if data.nonEmpty => PendingBufferWrite(commander, data, ack, tail)
|
||||
case Write.empty => if (tail eq Write.empty) EmptyPendingWrite else create(tail, Write.empty)
|
||||
case Write(data, ack) if data.nonEmpty => PendingBufferWrite(commander, data, ack, tail)
|
||||
case WriteFile(path, offset, count, ack) =>
|
||||
PendingWriteFile(commander, Paths.get(path), offset, count, ack, tail)
|
||||
case WritePath(path, offset, count, ack) =>
|
||||
PendingWriteFile(commander, path, offset, count, ack, tail)
|
||||
case CompoundWrite(h, t) => create(h, t)
|
||||
case x @ Write(_, ack) => // empty write with either an ACK or a non-standard NoACK
|
||||
case x @ Write(_, ack) => // empty write with either an ACK or a non-standard NoACK
|
||||
if (x.wantsAck) commander ! ack
|
||||
create(tail, Write.empty)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -109,7 +109,7 @@ private[io] class UdpListener(val udp: UdpExt, channelRegistry: ChannelRegistry,
|
|||
buffer.flip()
|
||||
handler ! Received(ByteString(buffer), sender)
|
||||
if (readsLeft > 0) innerReceive(readsLeft - 1, buffer)
|
||||
case null => // null means no data was available
|
||||
case null => // null means no data was available
|
||||
case unexpected =>
|
||||
throw new RuntimeException(s"Unexpected address in buffer: $unexpected") // will not happen, for exhaustiveness check
|
||||
}
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ private[dns] final class DnsSettings(system: ExtendedActorSystem, c: Config) {
|
|||
case "thread-local-random" => Policy.ThreadLocalRandom
|
||||
case "secure-random" => Policy.SecureRandom
|
||||
case s if s.isEmpty | s == "enhanced-double-hash-random" => Policy.EnhancedDoubleHashRandom
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException(
|
||||
"Invalid value for id-generator-policy, id-generator-policy must be 'thread-local-random', 'secure-random' or" +
|
||||
s"`enhanced-double-hash-random`")
|
||||
|
|
@ -100,7 +100,7 @@ private[dns] final class DnsSettings(system: ExtendedActorSystem, c: Config) {
|
|||
c.getString(path) match {
|
||||
case "forever" => Forever
|
||||
case "never" => Never
|
||||
case _ =>
|
||||
case _ =>
|
||||
val finiteTtl = c
|
||||
.getDuration(path)
|
||||
.requiring(!_.isNegative, s"pekko.io.dns.$path must be 'default', 'forever', 'never' or positive duration")
|
||||
|
|
@ -115,7 +115,7 @@ private[dns] final class DnsSettings(system: ExtendedActorSystem, c: Config) {
|
|||
} else if (etcResolvConf.exists()) {
|
||||
val parsed = ResolvConfParser.parseFile(etcResolvConf)
|
||||
parsed match {
|
||||
case Success(value) => Some(value)
|
||||
case Success(value) => Some(value)
|
||||
case Failure(exception) =>
|
||||
val log = Logging(system, classOf[DnsSettings])
|
||||
if (log.isWarningEnabled) {
|
||||
|
|
@ -144,7 +144,7 @@ private[dns] final class DnsSettings(system: ExtendedActorSystem, c: Config) {
|
|||
case ConfigValueType.STRING =>
|
||||
c.getString("ndots") match {
|
||||
case "default" => resolvConf.map(_.ndots).getOrElse(1)
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw new IllegalArgumentException("Invalid value for ndots. Must be the string 'default' or an integer.")
|
||||
}
|
||||
case ConfigValueType.NUMBER =>
|
||||
|
|
|
|||
|
|
@ -269,7 +269,7 @@ object Util {
|
|||
def immutableSeq[T](iterable: java.lang.Iterable[T]): immutable.Seq[T] =
|
||||
iterable match {
|
||||
case imm: immutable.Seq[_] => imm.asInstanceOf[immutable.Seq[T]]
|
||||
case other =>
|
||||
case other =>
|
||||
val i = other.iterator()
|
||||
if (i.hasNext) {
|
||||
val builder = new immutable.VectorBuilder[T]
|
||||
|
|
|
|||
|
|
@ -631,7 +631,7 @@ private[pekko] final class PromiseActorRef(
|
|||
override def sendSystemMessage(message: SystemMessage): Unit = message match {
|
||||
case _: Terminate => stop()
|
||||
case DeathWatchNotification(a, ec, at) => this.!(Terminated(a)(existenceConfirmed = ec, addressTerminated = at))
|
||||
case Watch(watchee, watcher) =>
|
||||
case Watch(watchee, watcher) =>
|
||||
if (watchee == this && watcher != this) {
|
||||
if (!addWatcher(watcher))
|
||||
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
|
||||
|
|
|
|||
|
|
@ -101,7 +101,7 @@ trait FutureTimeoutSupport {
|
|||
}
|
||||
future.value match {
|
||||
case Some(_) => future
|
||||
case None => // not completed yet
|
||||
case None => // not completed yet
|
||||
val p = Promise[T]()
|
||||
val timeout = using.scheduleOnce(duration) {
|
||||
p.tryFailure(new TimeoutException(s"Timeout of $duration expired"))
|
||||
|
|
|
|||
|
|
@ -176,7 +176,7 @@ object StatusReply {
|
|||
s match {
|
||||
case StatusReply.Success(v) => ScalaSuccess(v.asInstanceOf[T])
|
||||
case StatusReply.Error(ex) => ScalaFailure[T](ex)
|
||||
case unexpected =>
|
||||
case unexpected =>
|
||||
ScalaFailure(new IllegalArgumentException(s"Unexpected status reply success value: $unexpected"))
|
||||
}
|
||||
case fail @ ScalaFailure(_) => fail.asInstanceOf[Try[T]]
|
||||
|
|
|
|||
|
|
@ -107,7 +107,7 @@ import pekko.pattern.{
|
|||
case None =>
|
||||
finalStopMessage match {
|
||||
case Some(fsm) if fsm(msg) => context.stop(self)
|
||||
case _ =>
|
||||
case _ =>
|
||||
handlingWhileStopped match {
|
||||
case ForwardDeathLetters => context.system.deadLetters.forward(msg)
|
||||
case ForwardTo(h) => h.forward(msg)
|
||||
|
|
|
|||
|
|
@ -232,7 +232,7 @@ final case class ConsistentHashingRoutingLogic(
|
|||
case bytes: Array[Byte] => currentConsistenHash.nodeFor(bytes).routee
|
||||
case str: String => currentConsistenHash.nodeFor(str).routee
|
||||
case x: AnyRef => currentConsistenHash.nodeFor(SerializationExtension(system).serialize(x).get).routee
|
||||
case unexpected =>
|
||||
case unexpected =>
|
||||
throw new IllegalArgumentException(s"Unexpected hashdata: $unexpected") // will not happen, for exhaustiveness check
|
||||
}
|
||||
} catch {
|
||||
|
|
@ -244,7 +244,7 @@ final case class ConsistentHashingRoutingLogic(
|
|||
message match {
|
||||
case _ if hashMapping.isDefinedAt(message) => target(hashMapping(message))
|
||||
case hashable: ConsistentHashable => target(hashable.consistentHashKey)
|
||||
case _ =>
|
||||
case _ =>
|
||||
log.warning(
|
||||
"Message [{}] must be handled by hashMapping, or implement [{}] or be wrapped in [{}]",
|
||||
message.getClass.getName,
|
||||
|
|
|
|||
|
|
@ -42,8 +42,8 @@ trait Listeners { self: Actor =>
|
|||
* {{{ def receive = listenerManagement orElse … }}}
|
||||
*/
|
||||
protected def listenerManagement: Actor.Receive = {
|
||||
case Listen(l) => listeners.add(l)
|
||||
case Deafen(l) => listeners.remove(l)
|
||||
case Listen(l) => listeners.add(l)
|
||||
case Deafen(l) => listeners.remove(l)
|
||||
case WithListeners(f) =>
|
||||
val i = listeners.iterator
|
||||
while (i.hasNext) f(i.next)
|
||||
|
|
|
|||
|
|
@ -73,7 +73,7 @@ object Resizer {
|
|||
case (true, false) => Some(DefaultResizer(defaultResizerConfig))
|
||||
case (false, true) => Some(OptimalSizeExploringResizer(metricsBasedResizerConfig))
|
||||
case (false, false) => None
|
||||
case (true, true) =>
|
||||
case (true, true) =>
|
||||
throw new ResizerInitializationException(s"cannot enable both resizer and optimal-size-exploring-resizer", null)
|
||||
}
|
||||
}
|
||||
|
|
@ -336,7 +336,7 @@ private[pekko] class ResizablePoolActor(supervisorStrategy: SupervisorStrategy)
|
|||
|
||||
val resizerCell = context match {
|
||||
case x: ResizablePoolCell => x
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw ActorInitializationException(
|
||||
"Resizable router actor can only be used when resizer is defined, not in " + context.getClass)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -159,7 +159,7 @@ private[pekko] class RoutedActorCell(
|
|||
private[pekko] class RouterActor extends Actor {
|
||||
val cell = context match {
|
||||
case x: RoutedActorCell => x
|
||||
case _ =>
|
||||
case _ =>
|
||||
throw ActorInitializationException("Router actor can only be used in RoutedActorRef, not in " + context.getClass)
|
||||
}
|
||||
|
||||
|
|
@ -198,7 +198,7 @@ private[pekko] class RouterPoolActor(override val supervisorStrategy: Supervisor
|
|||
|
||||
val pool = cell.routerConfig match {
|
||||
case x: Pool => x
|
||||
case other =>
|
||||
case other =>
|
||||
throw ActorInitializationException("RouterPoolActor can only be used with Pool, not " + other.getClass)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -77,7 +77,7 @@ object Serialization {
|
|||
Serialization.currentTransportInformation.value match {
|
||||
case null =>
|
||||
originalSystem match {
|
||||
case null => path.toSerializationFormat
|
||||
case null => path.toSerializationFormat
|
||||
case system =>
|
||||
try path.toSerializationFormatWithAddress(system.provider.getDefaultAddress)
|
||||
catch { case NonFatal(_) => path.toSerializationFormat }
|
||||
|
|
@ -228,14 +228,14 @@ class Serialization(val system: ExtendedActorSystem) extends Extension {
|
|||
withTransportInformation { () =>
|
||||
serializer match {
|
||||
case s2: SerializerWithStringManifest => s2.fromBinary(bytes, manifest)
|
||||
case s1 =>
|
||||
case s1 =>
|
||||
if (manifest == "")
|
||||
s1.fromBinary(bytes, None)
|
||||
else {
|
||||
val cache = manifestCache.get
|
||||
cache.get(manifest) match {
|
||||
case Some(cachedClassManifest) => s1.fromBinary(bytes, cachedClassManifest)
|
||||
case None =>
|
||||
case None =>
|
||||
system.dynamicAccess.getClassFor[AnyRef](manifest) match {
|
||||
case Success(classManifest) =>
|
||||
val classManifestOption: Option[Class[_]] = Some(classManifest)
|
||||
|
|
|
|||
|
|
@ -439,7 +439,7 @@ class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerialize
|
|||
def toBinary(o: AnyRef): Array[Byte] = o match {
|
||||
case null => null
|
||||
case o: Array[Byte] => o
|
||||
case other =>
|
||||
case other =>
|
||||
throw new IllegalArgumentException(
|
||||
s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]")
|
||||
}
|
||||
|
|
@ -451,7 +451,7 @@ class ByteArraySerializer(val system: ExtendedActorSystem) extends BaseSerialize
|
|||
o match {
|
||||
case null =>
|
||||
case bytes: Array[Byte] => buf.put(bytes)
|
||||
case other =>
|
||||
case other =>
|
||||
throw new IllegalArgumentException(
|
||||
s"${getClass.getName} only serializes byte arrays, not [${other.getClass.getName}]")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
|
|||
with BlockingQueue[E] {
|
||||
|
||||
backing match {
|
||||
case null => throw new IllegalArgumentException("Backing Queue may not be null")
|
||||
case null => throw new IllegalArgumentException("Backing Queue may not be null")
|
||||
case b: BlockingQueue[_] =>
|
||||
require(maxCapacity > 0)
|
||||
require(b.size() == 0)
|
||||
|
|
@ -119,7 +119,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
|
|||
backing.poll() match {
|
||||
case null if remainingNanos <= 0 => null.asInstanceOf[E]
|
||||
case null => pollElement(notEmpty.awaitNanos(remainingNanos))
|
||||
case e => {
|
||||
case e => {
|
||||
notFull.signal()
|
||||
e
|
||||
}
|
||||
|
|
@ -134,7 +134,7 @@ class BoundedBlockingQueue[E <: AnyRef](val maxCapacity: Int, private val backin
|
|||
try {
|
||||
backing.poll() match {
|
||||
case null => null.asInstanceOf[E]
|
||||
case e =>
|
||||
case e =>
|
||||
notFull.signal()
|
||||
e
|
||||
}
|
||||
|
|
|
|||
|
|
@ -43,7 +43,7 @@ object HashCode {
|
|||
case value: Float => hash(seed, value)
|
||||
case value: Double => hash(seed, value)
|
||||
case value: Byte => hash(seed, value)
|
||||
case value: AnyRef =>
|
||||
case value: AnyRef =>
|
||||
var result = seed
|
||||
if (value eq null) result = hash(result, 0)
|
||||
else if (!isArray(value)) result = hash(result, value.hashCode())
|
||||
|
|
|
|||
|
|
@ -86,7 +86,7 @@ class TypedMultiMap[T <: AnyRef, K[_ <: T]] private (private val map: Map[T, Set
|
|||
*/
|
||||
def removed(key: T)(value: K[key.type]): TypedMultiMap[T, K] = {
|
||||
map.get(key) match {
|
||||
case None => this
|
||||
case None => this
|
||||
case Some(set) =>
|
||||
if (set(value)) {
|
||||
val newset = set - value
|
||||
|
|
|
|||
|
|
@ -80,10 +80,10 @@ private[pekko] final case class WildcardTree[T](
|
|||
val nextElement = elems.next()
|
||||
children.get(nextElement) match {
|
||||
case Some(branch) => branch.findWithSingleWildcard(elems)
|
||||
case None =>
|
||||
case None =>
|
||||
children.get("*") match {
|
||||
case Some(branch) => branch.findWithSingleWildcard(elems)
|
||||
case None =>
|
||||
case None =>
|
||||
val maybeWildcardSuffixMatchingBranch = wildcardSuffixChildren.collectFirst {
|
||||
case (key, branch) if nextElement.startsWith(key) => branch
|
||||
}
|
||||
|
|
@ -103,7 +103,7 @@ private[pekko] final case class WildcardTree[T](
|
|||
val newAlt = children.getOrElse("**", alt)
|
||||
children.get(elems.next()) match {
|
||||
case Some(branch) => branch.findWithTerminalDoubleWildcard(elems, newAlt)
|
||||
case None =>
|
||||
case None =>
|
||||
children.get("*") match {
|
||||
case Some(branch) => branch.findWithTerminalDoubleWildcard(elems, newAlt)
|
||||
case None => newAlt
|
||||
|
|
|
|||
|
|
@ -51,7 +51,7 @@ class AffinityPoolComparativeBenchmark {
|
|||
requireRightNumberOfCores(cores)
|
||||
|
||||
val mailboxConf = mailbox match {
|
||||
case "default" => ""
|
||||
case "default" => ""
|
||||
case "SingleConsumerOnlyUnboundedMailbox" =>
|
||||
s"""default-mailbox.mailbox-type = "${classOf[pekko.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}""""
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ class AffinityPoolRequestResponseBenchmark {
|
|||
requireRightNumberOfCores(cores)
|
||||
|
||||
val mailboxConf = mailbox match {
|
||||
case "default" => ""
|
||||
case "default" => ""
|
||||
case "SingleConsumerOnlyUnboundedMailbox" =>
|
||||
s"""default-mailbox.mailbox-type = "${classOf[pekko.dispatch.SingleConsumerOnlyUnboundedMailbox].getName}""""
|
||||
}
|
||||
|
|
|
|||
Some files were not shown because too many files have changed in this diff Show more
Loading…
Add table
Add a link
Reference in a new issue