format source with scalafmt, #2

This commit is contained in:
Auto Format 2022-11-03 09:46:22 +01:00 committed by Matthew de Detrich
parent 0e876025e8
commit 15b163da74
980 changed files with 8776 additions and 8578 deletions

View file

@ -166,7 +166,8 @@ private[akka] final class FunctionRef[-T](override val path: ActorPath, send: (T
val i = new BehaviorTestKitImpl[U](system, p, BehaviorImpl.ignore)
_children += p.name -> i
new FunctionRef[U](p, (message, _) => {
new FunctionRef[U](p,
(message, _) => {
val m = f(message);
if (m != null) {
selfInbox.ref ! m; i.selfInbox().ref ! message

View file

@ -134,7 +134,8 @@ private[akka] final class TestProbeImpl[M](name: String, system: ActorSystem[_])
val prevEnd = end
end = start + maxDiff
val ret = try f
val ret =
try f
finally end = prevEnd
val diff = now - start

View file

@ -21,7 +21,6 @@ package object scaladsl {
*
* Uses the scaling factor from the `TestTimeFactor` in the [[TestKitSettings]]
* (in implicit scope).
*
*/
implicit class TestDuration(val duration: FiniteDuration) extends AnyVal {
def dilated(implicit settings: TestKitSettings): FiniteDuration = settings.dilated(duration)

View file

@ -67,11 +67,11 @@ class TestAppenderSpec
"only filter events for given logger name" in {
val count = new AtomicInteger
LoggingTestKit
.custom({
.custom {
case logEvent =>
count.incrementAndGet()
logEvent.message == "Hello from right logger" && logEvent.loggerName == classOf[AnotherLoggerClass].getName
})
}
.withOccurrences(2)
.withLoggerName(classOf[AnotherLoggerClass].getName)
.expect {

View file

@ -22,9 +22,9 @@ class TestProbeSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
probe.fishForMessage(shortDuration) {
case _ => FishingOutcomes.complete
}
probe.awaitAssert({
probe.awaitAssert {
"result"
})
}
probe.expectMessageType[String]
probe.expectMessage("whoa")
probe.expectNoMessage()

View file

@ -159,8 +159,8 @@ class ActorRefSpec extends AkkaSpec("""
EventFilter[ActorInitializationException](occurrences = 1).intercept {
intercept[akka.actor.ActorInitializationException] {
wrap(
result => actorOf(Props(promiseIntercept(new FailingOuterActor(actorOf(Props(new InnerActor))))(result))))
wrap(result =>
actorOf(Props(promiseIntercept(new FailingOuterActor(actorOf(Props(new InnerActor))))(result))))
}
contextStackMustBeEmpty()
@ -168,8 +168,8 @@ class ActorRefSpec extends AkkaSpec("""
EventFilter[ActorInitializationException](occurrences = 1).intercept {
intercept[akka.actor.ActorInitializationException] {
wrap(
result => actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
wrap(result =>
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept(new FailingInnerActor)(result)))))))
}
contextStackMustBeEmpty()
@ -196,8 +196,7 @@ class ActorRefSpec extends AkkaSpec("""
EventFilter[ActorInitializationException](occurrences = 2).intercept {
intercept[akka.actor.ActorInitializationException] {
wrap(
result =>
wrap(result =>
actorOf(Props(new FailingInheritingOuterActor(
actorOf(Props(promiseIntercept(new FailingInheritingInnerActor)(result)))))))
}
@ -247,22 +246,21 @@ class ActorRefSpec extends AkkaSpec("""
EventFilter[ActorInitializationException](occurrences = 1).intercept {
intercept[akka.actor.ActorInitializationException] {
wrap(
result =>
wrap(result =>
actorOf(
Props(new OuterActor(actorOf(Props(promiseIntercept({ new InnerActor; new InnerActor })(result)))))))
Props(new OuterActor(actorOf(Props(promiseIntercept { new InnerActor; new InnerActor }(result)))))))
}
contextStackMustBeEmpty()
}
EventFilter[ActorInitializationException](occurrences = 1).intercept {
(intercept[java.lang.IllegalStateException] {
intercept[java.lang.IllegalStateException] {
wrap(result =>
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept({
actorOf(Props(new OuterActor(actorOf(Props(promiseIntercept {
throw new IllegalStateException("Ur state be b0rked")
})(result)))))))
}).getMessage should ===("Ur state be b0rked")
}(result)))))))
}.getMessage should ===("Ur state be b0rked")
contextStackMustBeEmpty()
}
@ -272,9 +270,9 @@ class ActorRefSpec extends AkkaSpec("""
EventFilter[ActorInitializationException](occurrences = 1, pattern = "/user/failingActor:").intercept {
intercept[java.lang.IllegalStateException] {
wrap(result =>
system.actorOf(Props(promiseIntercept({
system.actorOf(Props(promiseIntercept {
throw new IllegalStateException
})(result)), "failingActor"))
}(result)), "failingActor"))
}
}
}
@ -325,9 +323,9 @@ class ActorRefSpec extends AkkaSpec("""
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
(intercept[java.lang.IllegalStateException] {
intercept[java.lang.IllegalStateException] {
in.readObject
}).getMessage should ===(
}.getMessage should ===(
"Trying to deserialize a serialized ActorRef without an ActorSystem in scope." +
" Use 'akka.serialization.JavaSerializer.currentSystem.withValue(system) { ... }'")
}
@ -422,8 +420,8 @@ class ActorRefSpec extends AkkaSpec("""
}
}))
val ffive = (ref.ask(5)(timeout)).mapTo[String]
val fnull = (ref.ask(0)(timeout)).mapTo[String]
val ffive = ref.ask(5)(timeout).mapTo[String]
val fnull = ref.ask(0)(timeout).mapTo[String]
ref ! PoisonPill
Await.result(ffive, timeout.duration) should ===("five")
@ -466,8 +464,8 @@ class ActorRefSpec extends AkkaSpec("""
def receive = { case name: String => sender() ! context.child(name).isDefined }
}), "parent")
assert(Await.result((parent ? "child"), timeout.duration) === true)
assert(Await.result((parent ? "whatnot"), timeout.duration) === false)
assert(Await.result(parent ? "child", timeout.duration) === true)
assert(Await.result(parent ? "whatnot", timeout.duration) === false)
}
}
}

View file

@ -49,7 +49,8 @@ class ActorSelectionSpec extends AkkaSpec with DefaultTimeout {
val root = sysImpl.lookupRoot
def empty(path: String) =
new EmptyLocalActorRef(sysImpl.provider, path match {
new EmptyLocalActorRef(sysImpl.provider,
path match {
case RelativeActorPath(elems) => sysImpl.lookupRoot.path / elems
case _ => throw new RuntimeException()
}, system.eventStream)
@ -128,7 +129,7 @@ class ActorSelectionSpec extends AkkaSpec with DefaultTimeout {
val a2 = system.actorOf(p, name)
a2.path should ===(a1.path)
a2.path.toString should ===(a1.path.toString)
a2 should not be (a1)
a2 should not be a1
a2.toString should not be (a1.toString)
watch(a2)

View file

@ -24,7 +24,7 @@ class ActorTimeoutSpec extends AkkaSpec {
"use implicitly supplied timeout" in {
implicit val timeout = Timeout(testTimeout)
val echo = system.actorOf(Props.empty)
val f = (echo ? "hallo")
val f = echo ? "hallo"
intercept[AskTimeoutException] { Await.result(f, testTimeout + leeway) }
}

View file

@ -79,7 +79,8 @@ class CoordinatedShutdownSpec
// a, b can be in any order
result2.toSet should ===(Set("a", "b", "c"))
checkTopologicalSort(Map("b" -> phase("a"), "c" -> phase("b"), "d" -> phase("b", "c"), "e" -> phase("d"))) should ===(
checkTopologicalSort(Map("b" -> phase("a"), "c" -> phase("b"), "d" -> phase("b", "c"),
"e" -> phase("d"))) should ===(
List("a", "b", "c", "d", "e"))
val result3 =
@ -324,7 +325,8 @@ class CoordinatedShutdownSpec
val shouldBeCancelled = cancellables.zipWithIndex.collect {
case (c, i) if i % 2 == 0 => c
}
val cancelFutures = for {
val cancelFutures =
for {
_ <- cancellables
c <- shouldBeCancelled
} yield Future {
@ -785,7 +787,7 @@ class CoordinatedShutdownSpec
withSystemRunning(newSystem, cs)
TestKit.shutdownActorSystem(newSystem)
shutdownHooks should have size (0)
shutdownHooks should have size 0
protected def myHooksCount: Int = synchronized(shutdownHooks.size)
}

View file

@ -47,7 +47,8 @@ class DynamicAccessSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll
}
"try different constructors with recoverWith" in {
instantiateWithDefaultOrStringCtor("akka.actor.TestClassWithStringConstructor").get.name shouldBe "string ctor argument"
instantiateWithDefaultOrStringCtor(
"akka.actor.TestClassWithStringConstructor").get.name shouldBe "string ctor argument"
instantiateWithDefaultOrStringCtor("akka.actor.TestClassWithDefaultConstructor").get.name shouldBe "default"
instantiateWithDefaultOrStringCtor("akka.actor.foo.NonExistingClass") match {
case Failure(t) =>

View file

@ -44,7 +44,7 @@ object FSMActorSpec {
soFar + digit match {
case incomplete if incomplete.length < code.length =>
stay().using(CodeState(incomplete, code))
case codeTry if (codeTry == code) => {
case codeTry if codeTry == code => {
doUnlock()
goto(Open).using(CodeState("", code)).forMax(timeout)
}

View file

@ -25,7 +25,7 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
"be able to become multiple times in its constructor" in {
val a = system.actorOf(Props(new Becomer {
for (i <- 1 to 4) context.become({ case always => sender() ! s"$i:$always" })
for (i <- 1 to 4) context.become { case always => sender() ! s"$i:$always" }
def receive = { case _ => sender() ! "FAILURE" }
}))
a ! "pigdog"
@ -62,7 +62,7 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
val a = system.actorOf(Props(new Actor {
def receive = {
case "init" => sender() ! "init"
case "swap" => context.become({ case x: String => context.sender() ! x })
case "swap" => context.become { case x: String => context.sender() ! x }
}
}))
@ -78,10 +78,10 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
def receive = {
case "init" => sender() ! "init"
case "swap" =>
context.become({
context.become {
case "swapped" => sender() ! "swapped"
case "revert" => context.unbecome()
})
}
}
}))
@ -103,11 +103,11 @@ class HotSwapSpec extends AkkaSpec with ImplicitSender {
def receive = {
case "state" => sender() ! "0"
case "swap" =>
context.become({
context.become {
case "state" => sender() ! "1"
case "swapped" => sender() ! "swapped"
case "crash" => throw new Exception("Crash (expected)!")
})
}
sender() ! "swapped"
}
}))

View file

@ -240,7 +240,8 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
"stop continuous scheduling if the task throws exception" taggedAs TimingTest in {
EventFilter[Exception]("TEST", occurrences = 1).intercept {
val count = new AtomicInteger(0)
collectCancellable(scheduleAdapter.schedule(Duration.Zero, 20.millis, () => {
collectCancellable(scheduleAdapter.schedule(Duration.Zero, 20.millis,
() => {
val c = count.incrementAndGet()
testActor ! c
if (c == 3) throw new RuntimeException("TEST") with NoStackTrace
@ -256,7 +257,8 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
// when first throws
EventFilter[Exception]("TEST-1", occurrences = 1).intercept {
val count1 = new AtomicInteger(0)
collectCancellable(scheduleAdapter.schedule(Duration.Zero, 20.millis, () => {
collectCancellable(scheduleAdapter.schedule(Duration.Zero, 20.millis,
() => {
val c = count1.incrementAndGet()
if (c == 1)
throw new IllegalStateException("TEST-1") with NoStackTrace
@ -269,7 +271,8 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
// when later
EventFilter[Exception]("TEST-3", occurrences = 1).intercept {
val count2 = new AtomicInteger(0)
collectCancellable(scheduleAdapter.schedule(Duration.Zero, 20.millis, () => {
collectCancellable(scheduleAdapter.schedule(Duration.Zero, 20.millis,
() => {
val c = count2.incrementAndGet()
testActor ! c
if (c == 3) throw new IllegalStateException("TEST-3") with NoStackTrace
@ -286,7 +289,8 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
val initialDelay = 200.millis.dilated
val delay = 10.millis.dilated
val timeout = collectCancellable(scheduleAdapter.schedule(initialDelay, delay, () => {
val timeout = collectCancellable(scheduleAdapter.schedule(initialDelay, delay,
() => {
ticks.incrementAndGet()
}))
Thread.sleep(10.millis.dilated.toMillis)
@ -301,7 +305,8 @@ trait SchedulerSpec extends BeforeAndAfterEach with DefaultTimeout with Implicit
val initialDelay = 90.millis.dilated
val delay = 500.millis.dilated
val timeout = collectCancellable(scheduleAdapter.schedule(initialDelay, delay, () => {
val timeout = collectCancellable(scheduleAdapter.schedule(initialDelay, delay,
() => {
ticks.incrementAndGet()
}))
Thread.sleep((initialDelay + 200.millis.dilated).toMillis)
@ -473,7 +478,8 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
val counter = new AtomicInteger
val terminated = Future {
var rounds = 0
while (Try(sched.scheduleOnce(Duration.Zero, new Scheduler.TaskRunOnClose {
while (Try(sched.scheduleOnce(Duration.Zero,
new Scheduler.TaskRunOnClose {
override def run(): Unit = ()
})(localEC)).isSuccess) {
Thread.sleep(1)
@ -485,7 +491,8 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
def delay = if (ThreadLocalRandom.current.nextBoolean) step * 2 else step
val N = 1000000
(1 to N).foreach(_ =>
sched.scheduleOnce(delay, new Scheduler.TaskRunOnClose {
sched.scheduleOnce(delay,
new Scheduler.TaskRunOnClose {
override def run(): Unit = counter.incrementAndGet()
}))
sched.close()
@ -614,7 +621,8 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
var overrun = headroom
val cap = 1000000
val (success, failure) = Iterator
.continually(Try(sched.scheduleOnce(100.millis, new Scheduler.TaskRunOnClose {
.continually(Try(sched.scheduleOnce(100.millis,
new Scheduler.TaskRunOnClose {
override def run(): Unit = counter.incrementAndGet()
})))
.take(cap)
@ -632,7 +640,8 @@ class LightArrayRevolverSchedulerSpec extends AkkaSpec(SchedulerSpec.testConfRev
import system.dispatcher
val counter = new AtomicInteger()
sched.scheduleOnce(10.seconds)(counter.incrementAndGet())
sched.scheduleOnce(10.seconds, new Scheduler.TaskRunOnClose {
sched.scheduleOnce(10.seconds,
new Scheduler.TaskRunOnClose {
override def run(): Unit = counter.incrementAndGet()
})
driver.close()

View file

@ -170,7 +170,7 @@ object SupervisorHierarchySpec {
val sizes = s / kids
var rest = s % kids
val propsTemplate = Props.empty.withDispatcher("hierarchy")
(1 to kids).iterator.map { (id) =>
(1 to kids).iterator.map { id =>
val kidSize = if (rest > 0) {
rest -= 1; sizes + 1
} else sizes
@ -821,7 +821,8 @@ class SupervisorHierarchySpec extends AkkaSpec(SupervisorHierarchySpec.config) w
"suspend children while failing" taggedAs LongRunningTest in {
val latch = TestLatch()
val slowResumer = system.actorOf(Props(new Actor {
val slowResumer = system.actorOf(
Props(new Actor {
override def supervisorStrategy = OneForOneStrategy() {
case _ => Await.ready(latch, 4.seconds.dilated); SupervisorStrategy.Resume
}

View file

@ -439,10 +439,10 @@ class SupervisorSpec
"not lose system messages when a NonFatal exception occurs when processing a system message" in {
val parent = system.actorOf(Props(new Actor {
override val supervisorStrategy = OneForOneStrategy()({
override val supervisorStrategy = OneForOneStrategy() {
case e: IllegalStateException if e.getMessage == "OHNOES" => throw e
case _ => SupervisorStrategy.Restart
})
}
val child = context.watch(context.actorOf(Props(new Actor {
override def postRestart(reason: Throwable): Unit = testActor ! "child restarted"
def receive = {

View file

@ -27,16 +27,16 @@ class ExecutionContextSpec extends AkkaSpec with DefaultTimeout {
val es = Executors.newCachedThreadPool()
try {
val executor: Executor with ExecutionContext = ExecutionContext.fromExecutor(es)
executor should not be (null)
executor should not be null
val executorService: ExecutorService with ExecutionContext = ExecutionContext.fromExecutorService(es)
executorService should not be (null)
executorService should not be null
val jExecutor: ExecutionContextExecutor = ExecutionContext.fromExecutor(es)
jExecutor should not be (null)
jExecutor should not be null
val jExecutorService: ExecutionContextExecutorService = ExecutionContexts.fromExecutorService(es)
jExecutorService should not be (null)
jExecutorService should not be null
} finally {
es.shutdown
}

View file

@ -138,7 +138,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val q = factory(config)
ensureInitialMailboxState(config, q)
EventFilter.warning(pattern = "received dead letter", occurrences = (enqueueN - dequeueN)).intercept {
EventFilter.warning(pattern = "received dead letter", occurrences = enqueueN - dequeueN).intercept {
def createProducer(fromNum: Int, toNum: Int): Future[Vector[Envelope]] = spawn {
val messages = Vector() ++ (for (i <- fromNum to toNum) yield createMessageInvocation(i))

View file

@ -308,7 +308,7 @@ class ScanningEventBusSpec extends EventBusSpec("ScanningEventBus") {
def createNewEventBus(): BusType = new MyScanningEventBus
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
def createEvents(numberOfEvents: Int) = 0 until numberOfEvents
def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }
@ -339,7 +339,7 @@ class LookupEventBusSpec extends EventBusSpec("LookupEventBus") {
def createNewEventBus(): BusType = new MyLookupEventBus
def createEvents(numberOfEvents: Int) = (0 until numberOfEvents)
def createEvents(numberOfEvents: Int) = 0 until numberOfEvents
def createSubscriber(pipeTo: ActorRef) = new Procedure[Int] { def apply(i: Int) = pipeTo ! i }

View file

@ -96,7 +96,7 @@ object LoggerSpec {
sender() ! LoggerInitialized
case SetTarget(ref, `qualifier`) =>
target = Some(ref)
ref ! ("OK")
ref ! "OK"
case event: LogEvent if !event.mdc.isEmpty =>
print(event)
target.foreach { _ ! event }
@ -173,7 +173,7 @@ class LoggerSpec extends AnyWordSpec with Matchers {
"log messages to standard output" in {
val out = createSystemAndLogToBuffer("defaultLogger", defaultConfig, true)
out.size should be > (0)
out.size should be > 0
}
"drain logger queue on system.terminate" in {

View file

@ -68,7 +68,8 @@ class LoggingReceiveSpec extends AnyWordSpec with BeforeAndAfterAll {
system.eventStream.subscribe(testActor, classOf[UnhandledMessage])
val a = system.actorOf(Props(new Actor {
def receive =
new LoggingReceive(Some("funky"), {
new LoggingReceive(Some("funky"),
{
case null =>
})
}))

View file

@ -58,9 +58,9 @@ class JavaLoggerSpec extends AkkaSpec(JavaLoggerSpec.config) {
val record = expectMsgType[logging.LogRecord]
record should not be (null)
record.getMillis should not be (0)
record.getThreadID should not be (0)
record should not be null
record.getMillis should not be 0
record.getThreadID should not be 0
record.getLevel should ===(logging.Level.SEVERE)
record.getMessage should ===("Simulated error")
record.getThrown.getClass should ===(classOf[JavaLoggerSpec.SimulatedExc])
@ -73,9 +73,9 @@ class JavaLoggerSpec extends AkkaSpec(JavaLoggerSpec.config) {
val record = expectMsgType[logging.LogRecord]
record should not be (null)
record.getMillis should not be (0)
record.getThreadID should not be (0)
record should not be null
record.getMillis should not be 0
record.getThreadID should not be 0
record.getLevel should ===(logging.Level.INFO)
record.getMessage should ===("3 is the magic number")
record.getThrown should ===(null)

View file

@ -405,7 +405,8 @@ class TcpConnectionSpec extends AkkaSpec("""
connectionActor ! ResumeReading
connectionHandler.expectMsgType[Received].data.decodeString("ASCII") should ===(vs)
} finally shutdown(system)
}
finally shutdown(system)
}
"close the connection and reply with `Closed` upon reception of a `Close` command" in
@ -653,7 +654,7 @@ class TcpConnectionSpec extends AkkaSpec("""
override lazy val connectionActor =
createConnectionActor(serverAddress = UnboundAddress, timeout = Option(100.millis))
run {
connectionActor.toString should not be ("")
connectionActor.toString should not be ""
userHandler.expectMsg(CommandFailed(Connect(UnboundAddress, timeout = Option(100.millis))))
watch(connectionActor)
expectTerminated(connectionActor)
@ -982,7 +983,7 @@ class TcpConnectionSpec extends AkkaSpec("""
override def run(body: => Unit): Unit = super.run {
try {
serverSideChannel.configureBlocking(false)
serverSideChannel should not be (null)
serverSideChannel should not be null
interestCallReceiver.expectMsg(OP_CONNECT)
selector.send(connectionActor, ChannelConnectable)

View file

@ -227,7 +227,8 @@ class AsyncDnsResolverSpec extends AkkaSpec("""
def resolver(clients: List[ActorRef], config: Config): ActorRef = {
val settings = new DnsSettings(system.asInstanceOf[ExtendedActorSystem], config)
system.actorOf(Props(new AsyncDnsResolver(settings, new SimpleDnsCache(), (_, _) => {
system.actorOf(Props(new AsyncDnsResolver(settings, new SimpleDnsCache(),
(_, _) => {
clients
})))
}

View file

@ -349,7 +349,7 @@ class CircuitBreakerSpec extends AkkaSpec("""
breaker().currentFailureCount should ===(0)
intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" })
breaker().withSyncCircuitBreaker { if (Thread.currentThread() eq ct) throwException else "fail" }
}
breaker().currentFailureCount should ===(1)
breaker().withSyncCircuitBreaker(sayHi)
@ -362,7 +362,7 @@ class CircuitBreakerSpec extends AkkaSpec("""
breaker().currentFailureCount should ===(0)
intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" })
breaker().withSyncCircuitBreaker { if (Thread.currentThread() eq ct) throwException else "fail" }
}
breaker().currentFailureCount should ===(1)
@ -385,7 +385,7 @@ class CircuitBreakerSpec extends AkkaSpec("""
breaker().currentFailureCount should ===(0)
intercept[TestException] {
val ct = Thread.currentThread() // Ensure that the thunk is executed in the tests thread
breaker().withSyncCircuitBreaker({ if (Thread.currentThread() eq ct) throwException else "fail" })
breaker().withSyncCircuitBreaker { if (Thread.currentThread() eq ct) throwException else "fail" }
}
breaker().currentFailureCount should ===(1)
breaker().succeed()

View file

@ -29,10 +29,10 @@ class RetrySpec extends AkkaSpec with RetrySupport {
@volatile var counter = 0
val retried = retry(
() =>
Future.successful({
Future.successful {
counter += 1
counter
}),
},
5,
1 second)
@ -94,7 +94,8 @@ class RetrySpec extends AkkaSpec with RetrySupport {
} else Future.successful(5)
}
val retried = retry(() => attempt(), 5, attempted => {
val retried = retry(() => attempt(), 5,
attempted => {
attemptedCount = attempted
Some(100.milliseconds * attempted)
})

View file

@ -77,7 +77,7 @@ class RandomSpec extends AkkaSpec with DefaultTimeout with ImplicitSender {
actor ! akka.routing.Broadcast("end")
Await.ready(doneLatch, 5 seconds)
replies.values.foreach { _ should be > (0) }
replies.values.foreach { _ should be > 0 }
replies.values.sum should ===(iterationCount * connectionCount)
}

View file

@ -227,7 +227,7 @@ class ResizerSpec extends AkkaSpec(ResizerSpec.config) with DefaultTimeout with
}
val z = routeeSize(router)
z should be > (2)
z should be > 2
Thread.sleep((300 millis).dilated.toMillis)

View file

@ -29,8 +29,8 @@ object ScatterGatherFirstCompletedSpec {
Props(new Actor {
def receive = {
case Stop(None) => context.stop(self)
case Stop(Some(_id)) if (_id == id) => context.stop(self)
case _id: Int if (_id == id) =>
case Stop(Some(_id)) if _id == id => context.stop(self)
case _id: Int if _id == id =>
case _ => {
Thread.sleep(100 * id)
sender() ! id

View file

@ -51,15 +51,15 @@ class SmallestMailboxSpec extends AkkaSpec with DefaultTimeout with ImplicitSend
busy.countDown()
val busyPath = usedActors.get(0)
busyPath should not be (null)
busyPath should not be null
val path1 = usedActors.get(1)
val path2 = usedActors.get(2)
val path3 = usedActors.get(3)
path1 should not be (busyPath)
path2 should not be (busyPath)
path3 should not be (busyPath)
path1 should not be busyPath
path2 should not be busyPath
path3 should not be busyPath
}
}

View file

@ -183,7 +183,7 @@ class BoundedBlockingQueueSpec
// queue.take() must happen first
Thread.sleep(50) // this is why this test is tagged as TimingTest
events should contain(awaitNotEmpty)
events should not contain (poll)
events should not contain poll
}
"block until the backing queue is non-empty" taggedAs TimingTest in {
@ -557,7 +557,7 @@ class BoundedBlockingQueueSpec
val target = mutable.Buffer[String]()
elems.foreach(queue.put)
queue.drainTo(target.asJava)
elems should contain theSameElementsAs (target)
elems should contain theSameElementsAs target
}
}
@ -617,7 +617,7 @@ class BoundedBlockingQueueSpec
queue.retainAll(elems.asJava) should equal(true)
queue.remainingCapacity() should equal(1)
queue.toArray() shouldNot contain("Akka")
queue.toArray() should contain theSameElementsAs (elems)
queue.toArray() should contain theSameElementsAs elems
}
"return false if no elements were removed" in {

View file

@ -313,10 +313,10 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers {
for (i <- 0 until data.length) builder.putLongPart(data(i), nBytes)(byteOrder)
reference.zipWithIndex
.collect({ // Since there is no partial put on LongBuffer, we need to collect only the interesting bytes
.collect { // Since there is no partial put on LongBuffer, we need to collect only the interesting bytes
case (r, i) if byteOrder == ByteOrder.LITTLE_ENDIAN && i % elemSize < nBytes => r
case (r, i) if byteOrder == ByteOrder.BIG_ENDIAN && i % elemSize >= (elemSize - nBytes) => r
})
}
.toSeq == builder.result()
}
@ -886,13 +886,13 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers {
"calling span" in {
check { (a: ByteString, b: Byte) =>
likeVector(a)({ _.span(_ != b) match { case (a, b) => (a, b) } })
likeVector(a) { _.span(_ != b) match { case (a, b) => (a, b) } }
}
}
"calling takeWhile" in {
check { (a: ByteString, b: Byte) =>
likeVector(a)({ _.takeWhile(_ != b) })
likeVector(a) { _.takeWhile(_ != b) }
}
}
"calling dropWhile" in {
@ -940,9 +940,9 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers {
check { (slice: ByteStringSlice) =>
slice match {
case (xs, from, until) =>
likeVector(xs)({
likeVector(xs) {
_.slice(from, until)
})
}
}
}
}
@ -951,9 +951,9 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers {
check { (slice: ByteStringSlice) =>
slice match {
case (xs, from, until) =>
likeVector(xs)({
likeVector(xs) {
_.drop(from).take(until - from)
})
}
}
}
}
@ -970,11 +970,11 @@ class ByteStringSpec extends AnyWordSpec with Matchers with Checkers {
check { (slice: ByteStringSlice) =>
slice match {
case (xs, from, until) =>
likeVector(xs)({ it =>
likeVector(xs) { it =>
val array = new Array[Byte](xs.length)
it.copyToArray(array, from, until)
array.toSeq
})
}
}
}
}

View file

@ -19,7 +19,8 @@ import akka.testkit.DefaultTimeout
class IndexSpec extends AkkaSpec with Matchers with DefaultTimeout {
implicit val ec: ExecutionContextExecutor = system.dispatcher
private def emptyIndex =
new Index[String, Int](100, new Comparator[Int] {
new Index[String, Int](100,
new Comparator[Int] {
override def compare(a: Int, b: Int): Int = Integer.compare(a, b)
})
@ -101,7 +102,8 @@ class IndexSpec extends AkkaSpec with Matchers with DefaultTimeout {
index.isEmpty should ===(true)
}
"be able to be accessed in parallel" in {
val index = new Index[Int, Int](100, new Comparator[Int] {
val index = new Index[Int, Int](100,
new Comparator[Int] {
override def compare(a: Int, b: Int): Int = Integer.compare(a, b)
})
val nrOfTasks = 10000

View file

@ -594,7 +594,9 @@ class ImmutableJavaBehaviorSpec extends Messages with Become with Stoppable {
class TransformMessagesJavaBehaviorSpec extends ImmutableWithSignalJavaBehaviorSpec with Reuse with Siphon {
override def behavior(monitor: ActorRef[Event]): (Behavior[Command], Aux) = {
val inbox = TestInbox[Command]("transformMessagesListener")
JBehaviors.transformMessages(classOf[Command], super.behavior(monitor)._1, pf(_.`match`(classOf[Command], fi(x => {
JBehaviors.transformMessages(classOf[Command], super.behavior(monitor)._1,
pf(_.`match`(classOf[Command],
fi(x => {
inbox.ref ! x
x
})))) -> inbox

View file

@ -120,7 +120,8 @@ class DeferredSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with L
// monitor is implemented with tap, so this is testing both
val probe = TestProbe[Event]("evt")
val monitorProbe = TestProbe[Command]("monitor")
val behv = Behaviors.monitor(monitorProbe.ref, Behaviors.setup[Command] { _ =>
val behv = Behaviors.monitor(monitorProbe.ref,
Behaviors.setup[Command] { _ =>
probe.ref ! Started
target(probe.ref)
})

View file

@ -483,7 +483,8 @@ class InterceptSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with
"be possible to combine with MDC" in {
val probe = createTestProbe[String]()
val ref = spawn(Behaviors.setup[Command] { _ =>
Behaviors.withMdc(staticMdc = Map("x" -> "y"), mdcForMessage = (msg: Command) => {
Behaviors.withMdc(staticMdc = Map("x" -> "y"),
mdcForMessage = (msg: Command) => {
probe.ref ! s"mdc:${msg.s.toUpperCase()}"
Map("msg" -> msg.s.toUpperCase())
}) {

View file

@ -103,7 +103,8 @@ class TestConsumer(
case job @ SomeAsyncJob(_, confirmTo, producerId, seqNr) =>
// when replacing producer the seqNr may start from 1 again
val cleanProcessed =
if (seqNr == 1L) processed.filterNot { case (pid, _) => pid == producerId } else processed
if (seqNr == 1L) processed.filterNot { case (pid, _) => pid == producerId }
else processed
if (cleanProcessed((producerId, seqNr)))
throw new RuntimeException(s"Received duplicate [($producerId,$seqNr)]")

View file

@ -62,7 +62,8 @@ class ActorSystemSpec
"An ActorSystem" must {
"start the guardian actor and terminate when it terminates" in {
withSystem("a", Behaviors.receiveMessage[Probe] { p =>
withSystem("a",
Behaviors.receiveMessage[Probe] { p =>
p.replyTo ! p.message
Behaviors.stopped
}, doTerminate = false) { sys =>

View file

@ -72,7 +72,8 @@ class AdaptationFailureSpec extends ScalaTestWithActorTestKit with AnyWordSpecLi
val probe = createTestProbe[Any]()
val threw = Promise[Done]()
val ref = spawn(Behaviors.setup[Any] { ctx =>
val adapter = ctx.messageAdapter[Any](classOf[Any], { _ =>
val adapter = ctx.messageAdapter[Any](classOf[Any],
{ _ =>
threw.success(Done)
throw TestException("boom")
})

View file

@ -119,13 +119,13 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
}
"contain the class name where the first log was called" in {
val eventFilter = LoggingTestKit.custom({
val eventFilter = LoggingTestKit.custom {
case event if event.loggerName == classOf[ActorLoggingSpec].getName =>
true
case event =>
println(event.loggerName)
false
})
}
eventFilter.expect(spawn(Behaviors.setup[String] { context =>
context.log.info("Started")
@ -139,23 +139,23 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
}
"contain the object class name where the first log was called" in {
val eventFilter = LoggingTestKit.custom({
val eventFilter = LoggingTestKit.custom {
case event if event.loggerName == WhereTheBehaviorIsDefined.getClass.getName => true
case other =>
println(other.loggerName)
false
})
}
eventFilter.expect(spawn(WhereTheBehaviorIsDefined.behavior, "the-actor-with-object"))
}
"contain the abstract behavior class name where the first log was called" in {
val eventFilter = LoggingTestKit.custom({
val eventFilter = LoggingTestKit.custom {
case event if event.loggerName == classOf[BehaviorWhereTheLoggerIsUsed].getName => true
case other =>
println(other.loggerName)
false
})
}
eventFilter.expect {
spawn(Behaviors.setup[String](context => new BehaviorWhereTheLoggerIsUsed(context)), "the-actor-with-behavior")
@ -200,7 +200,7 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
true // any is fine, we're just after the right count of statements reaching the listener
}
.withOccurrences(36)
.expect({
.expect {
spawn(Behaviors.setup[String] {
context =>
context.log.debug("message")
@ -247,7 +247,7 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
Behaviors.stopped
})
})
}
}
"use Slf4jLogger from akka-slf4j automatically" in {
@ -490,8 +490,7 @@ class ActorLoggingSpec extends ScalaTestWithActorTestKit("""
Map(
ActorMdc.AkkaAddressKey -> system.classicSystem.asInstanceOf[ExtendedActorSystem].provider.addressString,
ActorMdc.AkkaSourceKey -> actorPath.get.toString,
ActorMdc.SourceActorSystemKey -> system.name)
)
ActorMdc.SourceActorSystemKey -> system.name))
true
} catch {
case ex: Throwable =>

View file

@ -231,7 +231,7 @@ class AdapterSpec extends AnyWordSpec with Matchers with BeforeAndAfterAll with
}
"allow seamless access to untyped extensions" in {
SerializationExtension(typedSystem) should not be (null)
SerializationExtension(typedSystem) should not be null
}
}

View file

@ -11,7 +11,7 @@ import scala.concurrent.{ ExecutionContextExecutor, Future }
import com.typesafe.config.{ Config, ConfigFactory }
import org.slf4j.Logger
import akka.{ Done, actor => classic }
import akka.{ actor => classic, Done }
import akka.actor.{ Address, BootstrapSetup, ClassicActorSystemProvider }
import akka.actor.setup.ActorSystemSetup
import akka.actor.typed.eventstream.EventStream

View file

@ -133,7 +133,6 @@ object Behavior {
* The `ClassTag` for `Outer` ensures that only messages of this class or a subclass thereof will be
* intercepted. Other message types (e.g. a private protocol) will bypass
* the interceptor and be continue to the inner behavior untouched.
*
*/
def transformMessages[Outer: ClassTag](matcher: PartialFunction[Outer, Inner]): Behavior[Outer] =
BehaviorImpl.transformMessages(behavior, matcher)

View file

@ -39,7 +39,6 @@ trait Scheduler {
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
*
* Note: For scheduling within actors `Behaviors.withTimers` or `ActorContext.scheduleOnce` should be preferred.
*
*/
def scheduleOnce(delay: java.time.Duration, runnable: Runnable, executor: ExecutionContext): Cancellable
@ -62,7 +61,6 @@ trait Scheduler {
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
*
* Note: For scheduling within actors `Behaviors.withTimers` should be preferred.
*
*/
def scheduleWithFixedDelay(initialDelay: FiniteDuration, delay: FiniteDuration)(runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable
@ -124,7 +122,6 @@ trait Scheduler {
* reach (calculated as: `delay / tickNanos > Int.MaxValue`).
*
* Note: For scheduling within actors `Behaviors.withTimers` should be preferred.
*
*/
def scheduleAtFixedRate(initialDelay: FiniteDuration, interval: FiniteDuration)(runnable: Runnable)(
implicit executor: ExecutionContext): Cancellable

View file

@ -258,8 +258,7 @@ object ProducerControllerImpl {
settings: ProducerController.Settings,
initialState: Option[DurableProducerQueue.State[A]])(
thenBecomeActive: (
ActorRef[RequestNext[A]],
ActorRef[ConsumerController.Command[A]],
ActorRef[RequestNext[A]], ActorRef[ConsumerController.Command[A]],
DurableProducerQueue.State[A]) => Behavior[InternalCommand]): Behavior[InternalCommand] = {
Behaviors.receiveMessagePartial[InternalCommand] {
case RegisterConsumer(c: ActorRef[ConsumerController.Command[A]] @unchecked) =>
@ -346,7 +345,8 @@ object ProducerControllerImpl {
val manifest = Serializers.manifestFor(ser, mAnyRef)
val serializerId = ser.identifier
if (bytes.length <= chunkSize) {
ChunkedMessage(ByteString.fromArrayUnsafe(bytes), firstChunk = true, lastChunk = true, serializerId, manifest) :: Nil
ChunkedMessage(ByteString.fromArrayUnsafe(bytes), firstChunk = true, lastChunk = true, serializerId,
manifest) :: Nil
} else {
val builder = Vector.newBuilder[ChunkedMessage]
val chunksIter = ByteString.fromArrayUnsafe(bytes).grouped(chunkSize)

View file

@ -443,7 +443,8 @@ private class WorkPullingProducerControllerImpl[A: ClassTag](
currentSeqNr = s.currentSeqNr + 1,
preselectedWorkers =
s.preselectedWorkers.updated(s.currentSeqNr, PreselectedWorker(outKey, out.confirmationQualifier)),
handOver = s.handOver.updated(s.currentSeqNr, HandOver(resend.oldConfirmationQualifier, resend.oldSeqNr))))
handOver =
s.handOver.updated(s.currentSeqNr, HandOver(resend.oldConfirmationQualifier, resend.oldSeqNr))))
case None =>
checkStashFull(stashBuffer)
// no demand from any workers, or all already preselected

View file

@ -43,7 +43,6 @@ object EventStream {
* def subscribe(actorSystem: ActorSystem[_], actorRef: ActorRef[A]) =
* actorSystem.eventStream ! EventStream.Subscribe[A1](actorRef)
* }}}
*
*/
final case class Subscribe[E](subscriber: ActorRef[E])(implicit classTag: ClassTag[E]) extends Command {

View file

@ -218,7 +218,7 @@ import scala.util.Success
override def ask[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[Res] => Req)(
mapResponse: Try[Res] => T)(implicit responseTimeout: Timeout, classTag: ClassTag[Res]): Unit = {
import akka.actor.typed.scaladsl.AskPattern._
pipeToSelf((target.ask(createRequest))(responseTimeout, system.scheduler))(mapResponse)
pipeToSelf(target.ask(createRequest)(responseTimeout, system.scheduler))(mapResponse)
}
override def askWithStatus[Req, Res](target: RecipientRef[Req], createRequest: ActorRef[StatusReply[Res]] => Req)(

View file

@ -95,7 +95,8 @@ private[akka] final class InterceptorImpl[O, I](
private def deduplicate(interceptedResult: Behavior[I], ctx: TypedActorContext[O]): Behavior[O] = {
val started = Behavior.start(interceptedResult, ctx.asInstanceOf[TypedActorContext[I]])
if (started == BehaviorImpl.UnhandledBehavior || started == BehaviorImpl.SameBehavior || !Behavior.isAlive(started)) {
if (started == BehaviorImpl.UnhandledBehavior || started == BehaviorImpl.SameBehavior || !Behavior.isAlive(
started)) {
started.unsafeCast[O]
} else {
// returned behavior could be nested in setups, so we need to start before we deduplicate

View file

@ -186,7 +186,8 @@ import java.util.function.Predicate
else {
val node = messages.next()
val message = wrap(node.message)
val interpretResult = try {
val interpretResult =
try {
message match {
case sig: Signal => Behavior.interpretSignal(b2, ctx, sig)
case msg => interpretUnstashedMessage(b2, ctx, msg, node)

View file

@ -303,7 +303,8 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
override protected def handleSignalException(
ctx: TypedActorContext[Any],
target: SignalTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, signalRestart = {
handleException(ctx,
signalRestart = {
case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => target(ctx, PreRestart)
})
@ -311,7 +312,8 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
override protected def handleReceiveException(
ctx: TypedActorContext[Any],
target: ReceiveTarget[T]): Catcher[Behavior[T]] = {
handleException(ctx, signalRestart = {
handleException(ctx,
signalRestart = {
case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => target.signalRestart(ctx)
})
@ -391,7 +393,8 @@ private class RestartSupervisor[T, Thr <: Throwable: ClassTag](initial: Behavior
case _ => newBehavior
}
nextBehavior.narrow
} catch handleException(ctx, signalRestart = {
} catch handleException(ctx,
signalRestart = {
case e: UnstashException[Any] @unchecked => Behavior.interpretSignal(e.behavior, ctx, PreRestart)
case _ => ()
})

View file

@ -33,7 +33,6 @@ private[typed] object SystemMessageList {
}
/**
*
* INTERNAL API
*
* Value class supporting list operations on system messages. The `next` field of [[SystemMessage]]
@ -45,7 +44,6 @@ private[typed] object SystemMessageList {
*
* The type of the list also encodes that the messages contained are in reverse order, i.e. the head of the list is the
* latest appended element.
*
*/
private[typed] class LatestFirstSystemMessageList(val head: SystemMessage) extends AnyVal {
import SystemMessageList._
@ -94,7 +92,6 @@ private[typed] class LatestFirstSystemMessageList(val head: SystemMessage) exten
}
/**
*
* INTERNAL API
*
* Value class supporting list operations on system messages. The `next` field of [[SystemMessage]]
@ -106,7 +103,6 @@ private[typed] class LatestFirstSystemMessageList(val head: SystemMessage) exten
*
* This list type also encodes that the messages contained are in reverse order, i.e. the head of the list is the
* latest appended element.
*
*/
private[typed] class EarliestFirstSystemMessageList(val head: SystemMessage) extends AnyVal {
import SystemMessageList._

View file

@ -187,7 +187,8 @@ import akka.util.OptionVal
private def withSafelyAdapted[U, V](adapt: () => U)(body: U => V): Unit = {
var failed = false
val adapted: U = try {
val adapted: U =
try {
adapt()
} catch {
case NonFatal(ex) =>
@ -248,7 +249,8 @@ import akka.util.OptionVal
classic.SupervisorStrategy.Stop
else
ActorAdapter.classicSupervisorDecider(ex)
} finally {
}
finally {
ctx.clearCurrentActorThread()
}
}

View file

@ -197,7 +197,8 @@ private[akka] object LocalReceptionist extends ReceptionistBehaviorProvider {
case None =>
}
updateServices(Set(key), { state =>
updateServices(Set(key),
{ state =>
val newState = state.serviceInstanceRemoved(key)(serviceInstance)
if (state.servicesPerActor.getOrElse(serviceInstance, Set.empty).isEmpty)
ctx.unwatch(serviceInstance)

View file

@ -27,7 +27,6 @@ import akka.util.JavaDurationConverters._
* message that is sent to the target Actor in order to function as a reply-to
* address, therefore the argument to the ask method is not the message itself
* but a function that given the reply-to address will create the message.
*
*/
object AskPattern {
@ -40,7 +39,7 @@ object AskPattern {
messageFactory: JFunction[ActorRef[Res], Req],
timeout: Duration,
scheduler: Scheduler): CompletionStage[Res] =
(actor.ask(messageFactory.apply)(timeout.asScala, scheduler)).toJava
actor.ask(messageFactory.apply)(timeout.asScala, scheduler).toJava
/**
* The same as [[ask]] but only for requests that result in a response of type [[akka.pattern.StatusReply]].
@ -53,6 +52,6 @@ object AskPattern {
messageFactory: JFunction[ActorRef[StatusReply[Res]], Req],
timeout: Duration,
scheduler: Scheduler): CompletionStage[Res] =
(actor.askWithStatus(messageFactory.apply)(timeout.asScala, scheduler).toJava)
actor.askWithStatus(messageFactory.apply)(timeout.asScala, scheduler).toJava
}

View file

@ -5,7 +5,7 @@
package akka.actor.typed.javadsl
import java.util.Collections
import java.util.function.{ Supplier, Function => JFunction }
import java.util.function.{ Function => JFunction, Supplier }
import scala.reflect.ClassTag
@ -154,7 +154,8 @@ object Behaviors {
def receive[T](
onMessage: JapiFunction2[ActorContext[T], T, Behavior[T]],
onSignal: JapiFunction2[ActorContext[T], Signal, Behavior[T]]): Behavior[T] = {
new BehaviorImpl.ReceiveBehavior((ctx, msg) => onMessage.apply(ctx.asJava, msg), {
new BehaviorImpl.ReceiveBehavior((ctx, msg) => onMessage.apply(ctx.asJava, msg),
{
case (ctx, sig) => onSignal.apply(ctx.asJava, sig)
})
}
@ -327,7 +328,6 @@ object Behaviors {
* each message processing by the inner behavior is done.
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
*/
def withMdc[T](
interceptMessageClass: Class[T],
@ -344,7 +344,6 @@ object Behaviors {
* @param staticMdc This MDC is setup in the logging context for every message
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
*/
def withMdc[T](
interceptMessageClass: Class[T],
@ -369,7 +368,6 @@ object Behaviors {
* each message processing by the inner behavior is done.
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
*/
def withMdc[T](
interceptMessageClass: Class[T],

View file

@ -28,7 +28,6 @@ abstract class Receive[T] extends ExtensibleBehavior[T] {
* * returning `stopped` will terminate this Behavior
* * returning `same` designates to reuse the current Behavior
* * returning `unhandled` keeps the same Behavior and signals that the message was not yet handled
*
*/
@throws(classOf[Exception])
def receiveMessage(msg: T): Behavior[T]

View file

@ -77,9 +77,11 @@ final class ReceiveBuilder[T] private (
* @return this behavior builder
*/
def onMessageEquals(msg: T, handler: Creator[Behavior[T]]): ReceiveBuilder[T] =
withMessage(OptionVal.Some(msg.getClass), OptionVal.Some(new JPredicate[T] {
override def test(param: T): Boolean = param == (msg)
}), new JFunction[T, Behavior[T]] {
withMessage(OptionVal.Some(msg.getClass),
OptionVal.Some(new JPredicate[T] {
override def test(param: T): Boolean = param == msg
}),
new JFunction[T, Behavior[T]] {
// invoke creator without the message
override def apply(param: T): Behavior[T] = handler.create()
})
@ -128,9 +130,11 @@ final class ReceiveBuilder[T] private (
* @return this behavior builder
*/
def onSignalEquals(signal: Signal, handler: Creator[Behavior[T]]): ReceiveBuilder[T] =
withSignal(signal.getClass, OptionVal.Some(new JPredicate[Signal] {
withSignal(signal.getClass,
OptionVal.Some(new JPredicate[Signal] {
override def test(param: Signal): Boolean = param == signal
}), new JFunction[Signal, Behavior[T]] {
}),
new JFunction[Signal, Behavior[T]] {
override def apply(param: Signal): Behavior[T] = handler.create()
})

View file

@ -8,7 +8,7 @@ import akka.actor.typed.{ scaladsl, Behavior }
import akka.annotation.DoNotInherit
import akka.japi.function.Procedure
import java.util.function.{ Predicate, Function => JFunction }
import java.util.function.{ Function => JFunction, Predicate }
/**
* A non thread safe mutable message buffer that can be used to buffer messages inside actors

View file

@ -49,7 +49,6 @@ abstract class AbstractBehavior[T](protected val context: ActorContext[T]) exten
* <li>returning `this` or `same` designates to reuse the current Behavior</li>
* <li>returning `unhandled` keeps the same Behavior and signals that the message was not yet handled</li>
* </ul>
*
*/
@throws(classOf[Exception])
def onMessage(msg: T): Behavior[T]
@ -87,7 +86,8 @@ abstract class AbstractBehavior[T](protected val context: ActorContext[T]) exten
@throws(classOf[Exception])
override final def receiveSignal(ctx: TypedActorContext[T], msg: Signal): Behavior[T] = {
checkRightContext(ctx)
onSignal.applyOrElse(msg, {
onSignal.applyOrElse(msg,
{
case MessageAdaptionFailure(ex) => throw ex
case _ => Behaviors.unhandled
}: PartialFunction[Signal, Behavior[T]])

View file

@ -250,7 +250,6 @@ object Behaviors {
* each message processing by the inner behavior is done.
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
*/
def withMdc[T: ClassTag](mdcForMessage: T => Map[String, String])(behavior: Behavior[T]): Behavior[T] =
withMdc[T](Map.empty[String, String], mdcForMessage)(behavior)
@ -265,7 +264,6 @@ object Behaviors {
* @param staticMdc This MDC is setup in the logging context for every message
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
*/
def withMdc[T: ClassTag](staticMdc: Map[String, String])(behavior: Behavior[T]): Behavior[T] =
withMdc[T](staticMdc, (_: T) => Map.empty[String, String])(behavior)
@ -288,7 +286,6 @@ object Behaviors {
* each message processing by the inner behavior is done.
* @param behavior The actual behavior handling the messages, the MDC is used for the log entries logged through
* `ActorContext.log`
*
*/
def withMdc[T: ClassTag](staticMdc: Map[String, String], mdcForMessage: T => Map[String, String])(
behavior: Behavior[T]): Behavior[T] =

View file

@ -70,12 +70,12 @@ object ByteIterator {
final override def clone: ByteArrayIterator = new ByteArrayIterator(array, from, until)
final override def take(n: Int): this.type = {
if (n < len) until = { if (n > 0) (from + n) else from }
if (n < len) until = { if (n > 0) from + n else from }
this
}
final override def drop(n: Int): this.type = {
if (n > 0) from = { if (n < len) (from + n) else until }
if (n > 0) from = { if (n < len) from + n else until }
this
}
@ -153,7 +153,7 @@ object ByteIterator {
def asInputStream: java.io.InputStream = new java.io.InputStream {
override def available: Int = iterator.len
def read: Int = if (hasNext) (next().toInt & 0xff) else -1
def read: Int = if (hasNext) next().toInt & 0xFF else -1
override def read(b: Array[Byte], off: Int, len: Int): Int = {
if ((off < 0) || (len < 0) || (off + len > b.length)) throw new IndexOutOfBoundsException
@ -372,7 +372,7 @@ object ByteIterator {
def asInputStream: java.io.InputStream = new java.io.InputStream {
override def available: Int = current.len
def read: Int = if (hasNext) (next().toInt & 0xff) else -1
def read: Int = if (hasNext) next().toInt & 0xFF else -1
override def read(b: Array[Byte], off: Int, len: Int): Int = {
val nRead = current.asInputStream.read(b, off, len)
@ -511,9 +511,9 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getShort(implicit byteOrder: ByteOrder): Short = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next() & 0xff) << 8 | (next() & 0xff) << 0).toShort
((next() & 0xFF) << 8 | (next() & 0xFF) << 0).toShort
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next() & 0xff) << 0 | (next() & 0xff) << 8).toShort
((next() & 0xFF) << 0 | (next() & 0xFF) << 8).toShort
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -522,15 +522,15 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getInt(implicit byteOrder: ByteOrder): Int = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next() & 0xff) << 24
| (next() & 0xff) << 16
| (next() & 0xff) << 8
| (next() & 0xff) << 0)
((next() & 0xFF) << 24
| (next() & 0xFF) << 16
| (next() & 0xFF) << 8
| (next() & 0xFF) << 0)
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next() & 0xff) << 0
| (next() & 0xff) << 8
| (next() & 0xff) << 16
| (next() & 0xff) << 24)
((next() & 0xFF) << 0
| (next() & 0xFF) << 8
| (next() & 0xFF) << 16
| (next() & 0xFF) << 24)
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -539,23 +539,23 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getLong(implicit byteOrder: ByteOrder): Long = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next().toLong & 0xff) << 56
| (next().toLong & 0xff) << 48
| (next().toLong & 0xff) << 40
| (next().toLong & 0xff) << 32
| (next().toLong & 0xff) << 24
| (next().toLong & 0xff) << 16
| (next().toLong & 0xff) << 8
| (next().toLong & 0xff) << 0)
((next().toLong & 0xFF) << 56
| (next().toLong & 0xFF) << 48
| (next().toLong & 0xFF) << 40
| (next().toLong & 0xFF) << 32
| (next().toLong & 0xFF) << 24
| (next().toLong & 0xFF) << 16
| (next().toLong & 0xFF) << 8
| (next().toLong & 0xFF) << 0)
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next().toLong & 0xff) << 0
| (next().toLong & 0xff) << 8
| (next().toLong & 0xff) << 16
| (next().toLong & 0xff) << 24
| (next().toLong & 0xff) << 32
| (next().toLong & 0xff) << 40
| (next().toLong & 0xff) << 48
| (next().toLong & 0xff) << 56)
((next().toLong & 0xFF) << 0
| (next().toLong & 0xFF) << 8
| (next().toLong & 0xFF) << 16
| (next().toLong & 0xFF) << 24
| (next().toLong & 0xFF) << 32
| (next().toLong & 0xFF) << 40
| (next().toLong & 0xFF) << 48
| (next().toLong & 0xFF) << 56)
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -566,11 +566,11 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
def getLongPart(n: Int)(implicit byteOrder: ByteOrder): Long = {
if (byteOrder == ByteOrder.BIG_ENDIAN) {
var x = 0L
(1 to n).foreach(_ => x = (x << 8) | (next() & 0xff))
(1 to n).foreach(_ => x = (x << 8) | (next() & 0xFF))
x
} else if (byteOrder == ByteOrder.LITTLE_ENDIAN) {
var x = 0L
(0 until n).foreach(i => x |= (next() & 0xff) << 8 * i)
(0 until n).foreach(i => x |= (next() & 0xFF) << 8 * i)
x
} else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}

View file

@ -75,7 +75,6 @@ object ByteString {
* want wrap it into an ByteArray, and from there on only use that reference (the ByteString)
* to operate on the wrapped data. For all other intents and purposes, please use the usual
* apply and create methods - which provide the immutability guarantees by copying the array.
*
*/
def fromArrayUnsafe(array: Array[Byte]): ByteString = ByteString1C(array)
@ -100,7 +99,6 @@ object ByteString {
* want wrap it into an ByteArray, and from there on only use that reference (the ByteString)
* to operate on the wrapped data. For all other intents and purposes, please use the usual
* apply and create methods - which provide the immutability guarantees by copying the array.
*
*/
def fromArrayUnsafe(array: Array[Byte], offset: Int, length: Int): ByteString = ByteString1(array, offset, length)
@ -304,7 +302,7 @@ object ByteString {
os.write(bytes, startIndex, length)
}
def isCompact: Boolean = (length == bytes.length)
def isCompact: Boolean = length == bytes.length
private[akka] def byteStringCompanion = ByteString1

View file

@ -5,7 +5,7 @@
package akka.util
import scala.{ collection => c }
import scala.collection.{ GenTraversable, immutable => i, mutable => m }
import scala.collection.{ immutable => i, mutable => m, GenTraversable }
import scala.collection.generic.{ CanBuildFrom, GenericCompanion, Sorted, SortedSetFactory }
import scala.language.higherKinds
import scala.language.implicitConversions
@ -49,8 +49,8 @@ package object ccompat {
simpleCBF(fact.newBuilder[A])
private[akka] implicit def sortedSetCompanionToCBF[
A: Ordering,
CC[X] <: c.SortedSet[X] with c.SortedSetLike[X, CC[X]]](fact: SortedSetFactory[CC]): CanBuildFrom[Any, A, CC[A]] =
A: Ordering, CC[X] <: c.SortedSet[X] with c.SortedSetLike[X, CC[X]]](
fact: SortedSetFactory[CC]): CanBuildFrom[Any, A, CC[A]] =
simpleCBF(fact.newBuilder[A])
private[ccompat] def build[T, CC](builder: m.Builder[T, CC], source: TraversableOnce[T]): CC = {

View file

@ -72,12 +72,12 @@ object ByteIterator {
final override def clone: ByteArrayIterator = new ByteArrayIterator(array, from, until)
final override def take(n: Int): this.type = {
if (n < len) until = { if (n > 0) (from + n) else from }
if (n < len) until = { if (n > 0) from + n else from }
this
}
final override def drop(n: Int): this.type = {
if (n > 0) from = { if (n < len) (from + n) else until }
if (n > 0) from = { if (n < len) from + n else until }
this
}
@ -165,7 +165,7 @@ object ByteIterator {
def asInputStream: java.io.InputStream = new java.io.InputStream {
override def available: Int = iterator.len
def read: Int = if (hasNext) (next().toInt & 0xff) else -1
def read: Int = if (hasNext) next().toInt & 0xFF else -1
override def read(b: Array[Byte], off: Int, len: Int): Int = {
if ((off < 0) || (len < 0) || (off + len > b.length)) throw new IndexOutOfBoundsException
@ -386,7 +386,7 @@ object ByteIterator {
def asInputStream: java.io.InputStream = new java.io.InputStream {
override def available: Int = current.len
def read: Int = if (hasNext) (next().toInt & 0xff) else -1
def read: Int = if (hasNext) next().toInt & 0xFF else -1
override def read(b: Array[Byte], off: Int, len: Int): Int = {
val nRead = current.asInputStream.read(b, off, len)
@ -527,9 +527,9 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getShort(implicit byteOrder: ByteOrder): Short = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next() & 0xff) << 8 | (next() & 0xff) << 0).toShort
((next() & 0xFF) << 8 | (next() & 0xFF) << 0).toShort
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next() & 0xff) << 0 | (next() & 0xff) << 8).toShort
((next() & 0xFF) << 0 | (next() & 0xFF) << 8).toShort
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -538,15 +538,15 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getInt(implicit byteOrder: ByteOrder): Int = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next() & 0xff) << 24
| (next() & 0xff) << 16
| (next() & 0xff) << 8
| (next() & 0xff) << 0)
((next() & 0xFF) << 24
| (next() & 0xFF) << 16
| (next() & 0xFF) << 8
| (next() & 0xFF) << 0)
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next() & 0xff) << 0
| (next() & 0xff) << 8
| (next() & 0xff) << 16
| (next() & 0xff) << 24)
((next() & 0xFF) << 0
| (next() & 0xFF) << 8
| (next() & 0xFF) << 16
| (next() & 0xFF) << 24)
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -555,23 +555,23 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getLong(implicit byteOrder: ByteOrder): Long = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next().toLong & 0xff) << 56
| (next().toLong & 0xff) << 48
| (next().toLong & 0xff) << 40
| (next().toLong & 0xff) << 32
| (next().toLong & 0xff) << 24
| (next().toLong & 0xff) << 16
| (next().toLong & 0xff) << 8
| (next().toLong & 0xff) << 0)
((next().toLong & 0xFF) << 56
| (next().toLong & 0xFF) << 48
| (next().toLong & 0xFF) << 40
| (next().toLong & 0xFF) << 32
| (next().toLong & 0xFF) << 24
| (next().toLong & 0xFF) << 16
| (next().toLong & 0xFF) << 8
| (next().toLong & 0xFF) << 0)
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next().toLong & 0xff) << 0
| (next().toLong & 0xff) << 8
| (next().toLong & 0xff) << 16
| (next().toLong & 0xff) << 24
| (next().toLong & 0xff) << 32
| (next().toLong & 0xff) << 40
| (next().toLong & 0xff) << 48
| (next().toLong & 0xff) << 56)
((next().toLong & 0xFF) << 0
| (next().toLong & 0xFF) << 8
| (next().toLong & 0xFF) << 16
| (next().toLong & 0xFF) << 24
| (next().toLong & 0xFF) << 32
| (next().toLong & 0xFF) << 40
| (next().toLong & 0xFF) << 48
| (next().toLong & 0xFF) << 56)
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -582,11 +582,11 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
def getLongPart(n: Int)(implicit byteOrder: ByteOrder): Long = {
if (byteOrder == ByteOrder.BIG_ENDIAN) {
var x = 0L
(1 to n).foreach(_ => x = (x << 8) | (next() & 0xff))
(1 to n).foreach(_ => x = (x << 8) | (next() & 0xFF))
x
} else if (byteOrder == ByteOrder.LITTLE_ENDIAN) {
var x = 0L
(0 until n).foreach(i => x |= (next() & 0xff) << 8 * i)
(0 until n).foreach(i => x |= (next() & 0xFF) << 8 * i)
x
} else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}

View file

@ -78,7 +78,6 @@ object ByteString {
* want wrap it into an ByteArray, and from there on only use that reference (the ByteString)
* to operate on the wrapped data. For all other intents and purposes, please use the usual
* apply and create methods - which provide the immutability guarantees by copying the array.
*
*/
def fromArrayUnsafe(array: Array[Byte]): ByteString = ByteString1C(array)
@ -103,7 +102,6 @@ object ByteString {
* want wrap it into an ByteArray, and from there on only use that reference (the ByteString)
* to operate on the wrapped data. For all other intents and purposes, please use the usual
* apply and create methods - which provide the immutability guarantees by copying the array.
*
*/
def fromArrayUnsafe(array: Array[Byte], offset: Int, length: Int): ByteString = ByteString1(array, offset, length)
@ -313,7 +311,7 @@ object ByteString {
os.write(bytes, startIndex, length)
}
def isCompact: Boolean = (length == bytes.length)
def isCompact: Boolean = length == bytes.length
private[akka] def byteStringCompanion = ByteString1

View file

@ -71,12 +71,12 @@ object ByteIterator {
final override def clone: ByteArrayIterator = new ByteArrayIterator(array, from, until)
final override def take(n: Int): this.type = {
if (n < len) until = { if (n > 0) (from + n) else from }
if (n < len) until = { if (n > 0) from + n else from }
this
}
final override def drop(n: Int): this.type = {
if (n > 0) from = { if (n < len) (from + n) else until }
if (n > 0) from = { if (n < len) from + n else until }
this
}
@ -162,7 +162,7 @@ object ByteIterator {
def asInputStream: java.io.InputStream = new java.io.InputStream {
override def available: Int = iterator.len
def read: Int = if (hasNext) (next().toInt & 0xff) else -1
def read: Int = if (hasNext) next().toInt & 0xFF else -1
override def read(b: Array[Byte], off: Int, len: Int): Int = {
if ((off < 0) || (len < 0) || (off + len > b.length)) throw new IndexOutOfBoundsException
@ -382,7 +382,7 @@ object ByteIterator {
def asInputStream: java.io.InputStream = new java.io.InputStream {
override def available: Int = current.len
def read: Int = if (hasNext) (next().toInt & 0xff) else -1
def read: Int = if (hasNext) next().toInt & 0xFF else -1
override def read(b: Array[Byte], off: Int, len: Int): Int = {
val nRead = current.asInputStream.read(b, off, len)
@ -523,9 +523,9 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getShort(implicit byteOrder: ByteOrder): Short = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next() & 0xff) << 8 | (next() & 0xff) << 0).toShort
((next() & 0xFF) << 8 | (next() & 0xFF) << 0).toShort
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next() & 0xff) << 0 | (next() & 0xff) << 8).toShort
((next() & 0xFF) << 0 | (next() & 0xFF) << 8).toShort
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -534,15 +534,15 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getInt(implicit byteOrder: ByteOrder): Int = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next() & 0xff) << 24
| (next() & 0xff) << 16
| (next() & 0xff) << 8
| (next() & 0xff) << 0)
((next() & 0xFF) << 24
| (next() & 0xFF) << 16
| (next() & 0xFF) << 8
| (next() & 0xFF) << 0)
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next() & 0xff) << 0
| (next() & 0xff) << 8
| (next() & 0xff) << 16
| (next() & 0xff) << 24)
((next() & 0xFF) << 0
| (next() & 0xFF) << 8
| (next() & 0xFF) << 16
| (next() & 0xFF) << 24)
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -551,23 +551,23 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
*/
def getLong(implicit byteOrder: ByteOrder): Long = {
if (byteOrder == ByteOrder.BIG_ENDIAN)
((next().toLong & 0xff) << 56
| (next().toLong & 0xff) << 48
| (next().toLong & 0xff) << 40
| (next().toLong & 0xff) << 32
| (next().toLong & 0xff) << 24
| (next().toLong & 0xff) << 16
| (next().toLong & 0xff) << 8
| (next().toLong & 0xff) << 0)
((next().toLong & 0xFF) << 56
| (next().toLong & 0xFF) << 48
| (next().toLong & 0xFF) << 40
| (next().toLong & 0xFF) << 32
| (next().toLong & 0xFF) << 24
| (next().toLong & 0xFF) << 16
| (next().toLong & 0xFF) << 8
| (next().toLong & 0xFF) << 0)
else if (byteOrder == ByteOrder.LITTLE_ENDIAN)
((next().toLong & 0xff) << 0
| (next().toLong & 0xff) << 8
| (next().toLong & 0xff) << 16
| (next().toLong & 0xff) << 24
| (next().toLong & 0xff) << 32
| (next().toLong & 0xff) << 40
| (next().toLong & 0xff) << 48
| (next().toLong & 0xff) << 56)
((next().toLong & 0xFF) << 0
| (next().toLong & 0xFF) << 8
| (next().toLong & 0xFF) << 16
| (next().toLong & 0xFF) << 24
| (next().toLong & 0xFF) << 32
| (next().toLong & 0xFF) << 40
| (next().toLong & 0xFF) << 48
| (next().toLong & 0xFF) << 56)
else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}
@ -578,11 +578,11 @@ abstract class ByteIterator extends BufferedIterator[Byte] {
def getLongPart(n: Int)(implicit byteOrder: ByteOrder): Long = {
if (byteOrder == ByteOrder.BIG_ENDIAN) {
var x = 0L
(1 to n).foreach(_ => x = (x << 8) | (next() & 0xff))
(1 to n).foreach(_ => x = (x << 8) | (next() & 0xFF))
x
} else if (byteOrder == ByteOrder.LITTLE_ENDIAN) {
var x = 0L
(0 until n).foreach(i => x |= (next() & 0xff) << 8 * i)
(0 until n).foreach(i => x |= (next() & 0xFF) << 8 * i)
x
} else throw new IllegalArgumentException("Unknown byte order " + byteOrder)
}

View file

@ -80,7 +80,6 @@ object ByteString {
* want wrap it into an ByteArray, and from there on only use that reference (the ByteString)
* to operate on the wrapped data. For all other intents and purposes, please use the usual
* apply and create methods - which provide the immutability guarantees by copying the array.
*
*/
def fromArrayUnsafe(array: Array[Byte]): ByteString = ByteString1C(array)
@ -105,7 +104,6 @@ object ByteString {
* want wrap it into an ByteArray, and from there on only use that reference (the ByteString)
* to operate on the wrapped data. For all other intents and purposes, please use the usual
* apply and create methods - which provide the immutability guarantees by copying the array.
*
*/
def fromArrayUnsafe(array: Array[Byte], offset: Int, length: Int): ByteString = ByteString1(array, offset, length)
@ -314,7 +312,7 @@ object ByteString {
os.write(bytes, startIndex, length)
}
def isCompact: Boolean = (length == bytes.length)
def isCompact: Boolean = length == bytes.length
private[akka] def byteStringCompanion = ByteString1

View file

@ -215,7 +215,6 @@ object AbstractActor {
* }
* }
* </pre>
*
*/
abstract class AbstractActor extends Actor {
@ -352,7 +351,6 @@ abstract class UntypedAbstractActor extends AbstractActor {
* Java API: compatible with lambda expressions
*
* Actor base class that mixes in logging into the Actor.
*
*/
abstract class AbstractLoggingActor extends AbstractActor with ActorLogging
@ -398,7 +396,6 @@ abstract class AbstractLoggingActor extends AbstractActor with ActorLogging
* For a `Stash` based actor that enforces unbounded deques see [[akka.actor.AbstractActorWithUnboundedStash]].
* There is also an unrestricted version [[akka.actor.AbstractActorWithUnrestrictedStash]] that does not
* enforce the mailbox type.
*
*/
abstract class AbstractActorWithStash extends AbstractActor with Stash
@ -408,7 +405,6 @@ abstract class AbstractActorWithStash extends AbstractActor with Stash
* Actor base class with `Stash` that enforces an unbounded deque for the actor. The proper mailbox has to be configured
* manually, and the mailbox should extend the [[akka.dispatch.DequeBasedMessageQueueSemantics]] marker trait.
* See [[akka.actor.AbstractActorWithStash]] for details on how `Stash` works.
*
*/
abstract class AbstractActorWithUnboundedStash extends AbstractActor with UnboundedStash
@ -417,6 +413,5 @@ abstract class AbstractActorWithUnboundedStash extends AbstractActor with Unboun
*
* Actor base class with `Stash` that does not enforce any mailbox type. The mailbox of the actor has to be configured
* manually. See [[akka.actor.AbstractActorWithStash]] for details on how `Stash` works.
*
*/
abstract class AbstractActorWithUnrestrictedStash extends AbstractActor with UnrestrictedStash

View file

@ -10,7 +10,6 @@ import akka.util.JavaDurationConverters._
/**
* Java API: compatible with lambda expressions
*
*/
object AbstractFSM {
@ -29,7 +28,6 @@ object AbstractFSM {
* Java API: compatible with lambda expressions
*
* Finite State Machine actor abstract base class.
*
*/
abstract class AbstractFSM[S, D] extends FSM[S, D] {
import java.util.{ List => JList }
@ -553,7 +551,6 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
* Java API: compatible with lambda expressions
*
* Finite State Machine actor abstract base class.
*
*/
abstract class AbstractLoggingFSM[S, D] extends AbstractFSM[S, D] with LoggingFSM[S, D]
@ -561,6 +558,5 @@ abstract class AbstractLoggingFSM[S, D] extends AbstractFSM[S, D] with LoggingFS
* Java API: compatible with lambda expressions
*
* Finite State Machine actor abstract base class with Stash support.
*
*/
abstract class AbstractFSMWithStash[S, D] extends AbstractFSM[S, D] with Stash

View file

@ -14,7 +14,6 @@ import akka.japi.Creator
import akka.util.Reflect
/**
*
* Java API: Factory for Props instances.
*/
private[akka] trait AbstractProps {

View file

@ -549,7 +549,8 @@ private[akka] class ActorCell(
currentMessage = null // reset current message after successful invocation
} catch handleNonFatalOrInterruptedException { e =>
handleInvokeFailure(Nil, e)
} finally
}
finally
// Schedule or reschedule receive timeout
checkReceiveTimeoutIfNeeded(msg, timeoutBeforeReceive)
}

View file

@ -89,13 +89,14 @@ object ActorPath {
// If the number of cases increase remember to add a `@switch` annotation e.g.:
// (findInvalidPathElementCharPosition(element): @switch) match {
(findInvalidPathElementCharPosition(element)) match {
findInvalidPathElementCharPosition(element) match {
case ValidPathCode =>
// valid
case EmptyPathCode =>
throw InvalidActorNameException(s"Actor path element must not be empty $fullPathMsg")
case invalidAt =>
throw InvalidActorNameException(s"""Invalid actor path element [$element]$fullPathMsg, illegal character [${element(
throw InvalidActorNameException(
s"""Invalid actor path element [$element]$fullPathMsg, illegal character [${element(
invalidAt)}] at position: $invalidAt. """ +
"""Actor paths MUST: """ +
"""not start with `$`, """ +
@ -130,7 +131,8 @@ object ActorPath {
case '%' if pos + 2 < len && isHexChar(s.charAt(pos + 1)) && isHexChar(s.charAt(pos + 2)) =>
validate(pos + 3)
case _ => pos
} else ValidPathCode
}
else ValidPathCode
if (len > 0 && s.charAt(0) != '$') validate(0) else 0
}

View file

@ -96,7 +96,6 @@ abstract class ActorSelection extends Serializable {
* if such an actor exists. It is completed with failure [[ActorNotFound]] if
* no such actor exists or the identification didn't complete within the
* supplied `timeout`.
*
*/
@deprecated("Use the overloaded method resolveOne which accepts java.time.Duration instead.", since = "2.5.20")
def resolveOneCS(timeout: FiniteDuration): CompletionStage[ActorRef] =
@ -110,7 +109,6 @@ abstract class ActorSelection extends Serializable {
* if such an actor exists. It is completed with failure [[ActorNotFound]] if
* no such actor exists or the identification didn't complete within the
* supplied `timeout`.
*
*/
@deprecated("Use the overloaded method resolveOne which accepts java.time.Duration instead.", since = "2.5.20")
def resolveOneCS(timeout: java.time.Duration): CompletionStage[ActorRef] = resolveOne(timeout)
@ -123,7 +121,6 @@ abstract class ActorSelection extends Serializable {
* if such an actor exists. It is completed with failure [[ActorNotFound]] if
* no such actor exists or the identification didn't complete within the
* supplied `timeout`.
*
*/
def resolveOne(timeout: java.time.Duration): CompletionStage[ActorRef] = {
import JavaDurationConverters._

View file

@ -935,7 +935,8 @@ private[akka] class ActorSystemImpl(
val scheduler: Scheduler = createScheduler()
val provider: ActorRefProvider = try {
val provider: ActorRefProvider =
try {
val arguments = Vector(
classOf[String] -> name,
classOf[Settings] -> settings,
@ -1026,7 +1027,8 @@ private[akka] class ActorSystemImpl(
"The calling code expected that the ActorSystem was initialized but it wasn't yet. " +
"This is probably a bug in the ActorSystem initialization sequence often related to initialization of extensions. " +
"Please report at https://github.com/akka/akka/issues.")
private lazy val _start: this.type = try {
private lazy val _start: this.type =
try {
registerOnTermination(stopScheduler())
// the provider is expected to start default loggers, LocalActorRefProvider does this

View file

@ -89,7 +89,8 @@ final case class Address private[akka] (protocol: String, system: String, host:
*/
def hostPort: String = toString.substring(protocol.length + 3)
/** INTERNAL API
/**
* INTERNAL API
* Check if the address is not created through `AddressFromURIString`, if there
* are any unusual characters in the host string.
*/

View file

@ -735,7 +735,8 @@ final class CoordinatedShutdown private[akka] (
val result = phaseDef.run(recoverEnabled)
val timeout = phases(phaseName).timeout
val deadline = Deadline.now + timeout
val timeoutFut = try {
val timeoutFut =
try {
after(timeout, system.scheduler) {
if (phaseName == CoordinatedShutdown.PhaseActorSystemTerminate && deadline.hasTimeLeft()) {
// too early, i.e. triggered by system termination

View file

@ -227,7 +227,7 @@ private[akka] class Deployer(val settings: ActorSystem.Settings, val dynamicAcce
.unwrapped
.asScala
.collect {
case (key, value: String) => (key -> value)
case (key, value: String) => key -> value
}
.toMap

View file

@ -91,7 +91,6 @@ trait ExtensionId[T <: Extension] {
* {{{
* override def get(system: ActorSystem): TheExtension = super.get(system)
* }}}
*
*/
def get(system: ActorSystem): T = apply(system)
@ -103,7 +102,6 @@ trait ExtensionId[T <: Extension] {
* {{{
* override def get(system: ClassicActorSystemProvider): TheExtension = super.get(system)
* }}}
*
*/
def get(system: ClassicActorSystemProvider): T = apply(system)

View file

@ -548,7 +548,7 @@ case class AllForOneStrategy(
children: Iterable[ChildRestartStats]): Unit = {
if (children.nonEmpty) {
if (restart && children.forall(_.requestRestartPermission(retriesWindow)))
children.foreach(crs => restartChild(crs.child, cause, suspendFirst = (crs.child != child)))
children.foreach(crs => restartChild(crs.child, cause, suspendFirst = crs.child != child))
else
for (c <- children) context.stop(c.child)
}

View file

@ -142,7 +142,8 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
}
override def isCancelled: Boolean = get == null
} catch {
}
catch {
case cause @ SchedulerException(msg) => throw new IllegalStateException(msg, cause)
}
}

View file

@ -25,11 +25,11 @@ import akka.annotation.DoNotInherit
class ReflectiveDynamicAccess(val classLoader: ClassLoader) extends DynamicAccess {
override def getClassFor[T: ClassTag](fqcn: String): Try[Class[_ <: T]] =
Try[Class[_ <: T]]({
Try[Class[_ <: T]] {
val c = Class.forName(fqcn, false, classLoader).asInstanceOf[Class[_ <: T]]
val t = implicitly[ClassTag[T]].runtimeClass
if (t.isAssignableFrom(c)) c else throw new ClassCastException(t.toString + " is not assignable from " + c)
})
}
override def createInstanceFor[T: ClassTag](clazz: Class[_], args: immutable.Seq[(Class[_], AnyRef)]): Try[T] =
Try {

View file

@ -112,7 +112,8 @@ trait Scheduler {
}
override def isCancelled: Boolean = get == null
} catch {
}
catch {
case SchedulerException(msg) => throw new IllegalStateException(msg)
}
}
@ -441,7 +442,8 @@ trait Scheduler {
implicit
executor: ExecutionContext,
sender: ActorRef = Actor.noSender): Cancellable =
scheduleOnce(delay, new Runnable {
scheduleOnce(delay,
new Runnable {
override def run(): Unit = receiver ! message
})

View file

@ -198,7 +198,8 @@ private[akka] trait StashSupport {
* if the `unstash()` call successfully returns or throws an exception.
*/
private[akka] def unstash(): Unit =
if (theStash.nonEmpty) try {
if (theStash.nonEmpty)
try {
enqueueFirst(theStash.head)
} finally {
theStash = theStash.tail

View file

@ -649,8 +649,6 @@ final case class TypedProps[T <: AnyRef] protected[TypedProps] (
/**
* Scala API: return a new TypedProps that will use the specified Timeout for its non-void-returning methods,
* if None is specified, it will use the default timeout as specified in the configuration.
*
*
*/
def withTimeout(timeout: Option[Timeout]): TypedProps[T] = this.copy(timeout = timeout)
@ -759,5 +757,6 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac
case handler: TypedActorInvocationHandler => handler
case _ => null
}
} else null
}
else null
}

View file

@ -272,8 +272,7 @@ private[akka] trait Children { this: ActorCell =>
if (oldInfo eq null)
Serialization.currentTransportInformation.value = system.provider.serializationInformation
props.args.forall(
arg =>
props.args.forall(arg =>
arg == null ||
arg.isInstanceOf[NoSerializationVerificationNeeded] ||
settings.NoSerializationVerificationNeededClassPrefix.exists(arg.getClass.getName.startsWith) || {

View file

@ -173,7 +173,8 @@ private[akka] object ChildrenContainer {
if (t.isEmpty) reason match {
case Termination => TerminatedChildrenContainer
case _ => NormalChildrenContainer(c - child.path.name)
} else copy(c - child.path.name, t)
}
else copy(c - child.path.name, t)
}
override def getByName(name: String): Option[ChildStats] = c.get(name)

View file

@ -29,7 +29,8 @@ private[akka] trait DeathWatch { this: ActorCell =>
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
updateWatching(a, None)
} else
}
else
checkWatchingSame(a, None)
}
a
@ -44,7 +45,8 @@ private[akka] trait DeathWatch { this: ActorCell =>
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
updateWatching(a, Some(msg))
} else
}
else
checkWatchingSame(a, Some(msg))
}
a

View file

@ -36,8 +36,7 @@ final case class SerializationCheckFailedException private[dungeon] (msg: Object
@InternalApi
private[akka] trait Dispatch { this: ActorCell =>
@nowarn @volatile private var _mailboxDoNotCallMeDirectly
: Mailbox = _ //This must be volatile since it isn't protected by the mailbox status
@nowarn @volatile private var _mailboxDoNotCallMeDirectly: Mailbox = _ // This must be volatile since it isn't protected by the mailbox status
@nowarn private def _preventPrivateUnusedErasure = {
_mailboxDoNotCallMeDirectly
@ -177,7 +176,8 @@ private[akka] trait Dispatch { this: ActorCell =>
if (system.settings.NoSerializationVerificationNeededClassPrefix.exists(msg.getClass.getName.startsWith))
envelope
else {
val deserializedMsg = try {
val deserializedMsg =
try {
serializeAndDeserializePayload(msg)
} catch {
case NonFatal(e) => throw SerializationCheckFailedException(msg, e)

View file

@ -95,7 +95,8 @@ private[akka] trait FaultHandling { this: ActorCell =>
} catch handleNonFatalOrInterruptedException { e =>
val ex = PreRestartException(self, e, cause, optionalMessage)
publish(Error(ex, self.path.toString, clazz(failedActor), e.getMessage))
} finally {
}
finally {
clearActorFields(failedActor, recreate = true)
}
}
@ -203,7 +204,8 @@ private[akka] trait FaultHandling { this: ActorCell =>
@InternalStableApi
final def handleInvokeFailure(childrenNotToSuspend: immutable.Iterable[ActorRef], t: Throwable): Unit = {
// prevent any further messages to be processed until the actor has been restarted
if (!isFailed) try {
if (!isFailed)
try {
suspendNonRecursive()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
@ -242,12 +244,18 @@ private[akka] trait FaultHandling { this: ActorCell =>
try if (a ne null) a.aroundPostStop()
catch handleNonFatalOrInterruptedException { e =>
publish(Error(e, self.path.toString, clazz(a), e.getMessage))
} finally try stopFunctionRefs()
finally try dispatcher.detach(this)
finally try parent.sendSystemMessage(
}
finally
try stopFunctionRefs()
finally
try dispatcher.detach(this)
finally
try parent.sendSystemMessage(
DeathWatchNotification(self, existenceConfirmed = true, addressTerminated = false))
finally try tellWatchersWeDied()
finally try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
finally
try tellWatchersWeDied()
finally
try unwatchWatchedActors(a) // stay here as we expect an emergency stop from handleInvokeFailure
finally {
if (system.settings.DebugLifecycle)
publish(Debug(self.path.toString, clazz(a), "stopped"))
@ -272,8 +280,7 @@ private[akka] trait FaultHandling { this: ActorCell =>
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(freshActor), "restarted"))
// only after parent is up and running again do restart the children which were not stopped
survivors.foreach(
child =>
survivors.foreach(child =>
try child.asInstanceOf[InternalActorRef].restart(cause)
catch handleNonFatalOrInterruptedException { e =>
publish(Error(e, self.path.toString, clazz(freshActor), "restarting " + child))

View file

@ -70,7 +70,8 @@ private[akka] object MessageDispatcher {
// dispatcher debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
final val debug = false // Deliberately without type ascription to make it a compile-time constant
lazy val actors = new Index[MessageDispatcher, ActorRef](16, new ju.Comparator[ActorRef] {
lazy val actors = new Index[MessageDispatcher, ActorRef](16,
new ju.Comparator[ActorRef] {
override def compare(a: ActorRef, b: ActorRef): Int = a.compareTo(b)
})
def printActors(): Unit =
@ -371,8 +372,8 @@ abstract class MessageDispatcherConfigurator(_config: Config, val prerequisites:
.recover {
case exception =>
throw new IllegalArgumentException(
("""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature""")
"""Cannot instantiate ExecutorServiceConfigurator ("executor = [%s]"), defined in [%s],
make sure it has an accessible constructor with a [%s,%s] signature"""
.format(fqcn, config.getString("id"), classOf[Config], classOf[DispatcherPrerequisites]),
exception)
}

View file

@ -20,7 +20,7 @@ import akka.annotation.InternalApi
import akka.annotation.InternalStableApi
import akka.compat
import akka.dispatch.internal.SameThreadExecutionContext
import akka.japi.{ Procedure, Function => JFunc, Option => JOption }
import akka.japi.{ Function => JFunc, Option => JOption, Procedure }
import akka.util.unused
/**

View file

@ -998,7 +998,8 @@ object BoundedControlAwareMailbox {
var remaining = pushTimeOut.toNanos
putLock.lockInterruptibly()
val inserted = try {
val inserted =
try {
var stop = false
while (size.get() == capacity && !stop) {
remaining = notFull.awaitNanos(remaining)

View file

@ -170,7 +170,6 @@ private[akka] class AffinityPool(
* due to an exception being thrown in user code, the worker is
* responsible for adding one more worker to compensate for its
* own termination
*
*/
private def onWorkerExit(w: AffinityPoolWorker, abruptTermination: Boolean): Unit =
bookKeepingLock.withGuard {
@ -410,7 +409,7 @@ private[akka] final class FairDistributionHashCache(val config: Config) extends
override def toString: String =
s"FairDistributionHashCache(fairDistributionThreshold = $fairDistributionThreshold)"
private[this] final def improve(h: Int): Int =
0x7FFFFFFF & (reverseBytes(h * 0x9e3775cd) * 0x9e3775cd) // `sbhash`: In memory of Phil Bagwell.
0x7FFFFFFF & (reverseBytes(h * 0x9E3775CD) * 0x9E3775CD) // `sbhash`: In memory of Phil Bagwell.
override final def getQueue(command: Runnable, queues: Int): Int = {
val runnableHash = command.hashCode()
if (fairDistributionThreshold == 0)

View file

@ -36,7 +36,6 @@ private[akka] object SystemMessageList {
}
/**
*
* INTERNAL API
*
* Value class supporting list operations on system messages. The `next` field of [[SystemMessage]]
@ -48,7 +47,6 @@ private[akka] object SystemMessageList {
*
* The type of the list also encodes that the messages contained are in reverse order, i.e. the head of the list is the
* latest appended element.
*
*/
private[akka] class LatestFirstSystemMessageList(val head: SystemMessage) extends AnyVal {
import SystemMessageList._
@ -97,7 +95,6 @@ private[akka] class LatestFirstSystemMessageList(val head: SystemMessage) extend
}
/**
*
* INTERNAL API
*
* Value class supporting list operations on system messages. The `next` field of [[SystemMessage]]
@ -109,7 +106,6 @@ private[akka] class LatestFirstSystemMessageList(val head: SystemMessage) extend
*
* This list type also encodes that the messages contained are in reverse order, i.e. the head of the list is the
* latest appended element.
*
*/
private[akka] class EarliestFirstSystemMessageList(val head: SystemMessage) extends AnyVal {
import SystemMessageList._

View file

@ -82,7 +82,8 @@ trait PredicateClassifier { this: EventBus =>
*/
trait LookupClassification { this: EventBus =>
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(), new Comparator[Subscriber] {
protected final val subscribers = new Index[Classifier, Subscriber](mapSize(),
new Comparator[Subscriber] {
def compare(a: Subscriber, b: Subscriber): Int = compareSubscribers(a, b)
})
@ -297,7 +298,7 @@ trait ManagedActorClassification { this: ActorEventBus with ActorClassifier =>
/** The unsubscriber takes care of unsubscribing actors, which have terminated. */
protected lazy val unsubscriber =
ActorClassificationUnsubscriber.start(system, this.toString(), (this.unsubscribe: ActorRef => Unit))
ActorClassificationUnsubscriber.start(system, this.toString(), this.unsubscribe: ActorRef => Unit)
@tailrec
protected final def associate(monitored: ActorRef, monitor: ActorRef): Boolean = {

View file

@ -144,7 +144,8 @@ trait LoggingBus extends ActorEventBus {
try {
if (system.settings.DebugUnhandledMessage)
subscribe(
system.systemActorOf(Props(new Actor {
system.systemActorOf(
Props(new Actor {
def receive = {
case UnhandledMessage(msg, sender, rcp) =>
publish(Debug(rcp.path.toString, rcp.getClass, "unhandled message from " + sender + ": " + msg))
@ -202,7 +203,8 @@ trait LoggingBus extends ActorEventBus {
val actor = system.systemActorOf(Props(clazz).withDispatcher(system.settings.LoggersDispatcher), name)
implicit def timeout: Timeout = system.settings.LoggerStartTimeout
import akka.pattern.ask
val response = try Await.result(actor ? InitializeLogger(this), timeout.duration)
val response =
try Await.result(actor ? InitializeLogger(this), timeout.duration)
catch {
case _: TimeoutException =>
publish(

View file

@ -90,7 +90,8 @@ private[akka] object DirectByteBufferPool {
try if (bb.isDirect) {
val cleaner = cleanerMethod.invoke(bb)
cleanMethod.invoke(cleaner)
} catch { case NonFatal(_) => /* ok, best effort attempt to cleanup failed */ }
}
catch { case NonFatal(_) => /* ok, best effort attempt to cleanup failed */ }
}
} catch { case NonFatal(_) => _ => () /* reflection failed, use no-op fallback */ }

View file

@ -95,7 +95,8 @@ object Dns extends ExtensionId[DnsExt] with ExtensionIdProvider {
@deprecated("Use cached(DnsProtocol.Resolve)", "2.6.0")
def apply(newProtocol: DnsProtocol.Resolved): Resolved = {
Resolved(newProtocol.name, newProtocol.records.collect {
Resolved(newProtocol.name,
newProtocol.records.collect {
case r: ARecord => r.ip
case r: AAAARecord => r.ip
})

View file

@ -12,7 +12,6 @@ import akka.actor.Actor
* It is expected that this will be deprecated/removed in future Akka versions
*
* TODO make private and remove deprecated in 2.7.0
*
*/
@deprecated("Overriding the DNS implementation will be removed in future versions of Akka", "2.6.0")
trait DnsProvider {

View file

@ -133,7 +133,8 @@ private[io] object SelectionHandler {
if (cause.isInstanceOf[DeathPactException]) {
try context.system.eventStream.publish {
Logging.Debug(child.path.toString, getClass, "Closed after handler termination")
} catch { case NonFatal(_) => }
}
catch { case NonFatal(_) => }
} else super.logFailure(context, child, cause, decision)
}

Some files were not shown because too many files have changed in this diff Show more