Reformat with scalafmt 3.8.2
Executed command: scalafmt --non-interactive
This commit is contained in:
parent
ec2f01ea8c
commit
fa1351b213
22 changed files with 47 additions and 43 deletions
|
|
@ -273,12 +273,9 @@ class BackoffSupervisorSpec extends PekkoSpec with ImplicitSender with Eventuall
|
|||
val delayTable =
|
||||
Table(
|
||||
("restartCount", "minBackoff", "maxBackoff", "randomFactor", "expectedResult"),
|
||||
(0, 0.minutes, 0.minutes, 0d, 0.minutes),
|
||||
(0, 5.minutes, 7.minutes, 0d, 5.minutes),
|
||||
(2, 5.seconds, 7.seconds, 0d, 7.seconds),
|
||||
(2, 5.seconds, 7.days, 0d, 20.seconds),
|
||||
(29, 5.minutes, 10.minutes, 0d, 10.minutes),
|
||||
(29, 10000.days, 10000.days, 0d, 10000.days),
|
||||
(0, 0.minutes, 0.minutes, 0d, 0.minutes), (0, 5.minutes, 7.minutes, 0d, 5.minutes),
|
||||
(2, 5.seconds, 7.seconds, 0d, 7.seconds), (2, 5.seconds, 7.days, 0d, 20.seconds),
|
||||
(29, 5.minutes, 10.minutes, 0d, 10.minutes), (29, 10000.days, 10000.days, 0d, 10000.days),
|
||||
(Int.MaxValue, 10000.days, 10000.days, 0d, 10000.days))
|
||||
forAll(delayTable) {
|
||||
(
|
||||
|
|
|
|||
|
|
@ -32,7 +32,8 @@ class CircuitBreakerMTSpec extends PekkoSpec {
|
|||
def openBreaker(breaker: CircuitBreaker): Unit = {
|
||||
// returns true if the breaker is open
|
||||
def failingCall(): Boolean =
|
||||
Await.result(breaker.withCircuitBreaker(Future.failed(new RuntimeException("FAIL"))).recover {
|
||||
Await.result(
|
||||
breaker.withCircuitBreaker(Future.failed(new RuntimeException("FAIL"))).recover {
|
||||
case _: CircuitBreakerOpenException => true
|
||||
case _ => false
|
||||
}, remainingOrDefault)
|
||||
|
|
|
|||
|
|
@ -26,7 +26,8 @@ import pekko.testkit._
|
|||
|
||||
object TailChoppingSpec {
|
||||
def newActor(id: Int, sleepTime: Duration)(implicit system: ActorSystem) =
|
||||
system.actorOf(Props(new Actor {
|
||||
system.actorOf(
|
||||
Props(new Actor {
|
||||
var times: Int = _
|
||||
|
||||
def receive = {
|
||||
|
|
|
|||
|
|
@ -174,7 +174,8 @@ object DurableProducerQueue {
|
|||
override def equals(obj: Any): Boolean = {
|
||||
obj match {
|
||||
case other: MessageSent[_] =>
|
||||
seqNr == other.seqNr && message == other.message && ack == other.ack && confirmationQualifier == other.confirmationQualifier && timestampMillis == other.timestampMillis
|
||||
seqNr == other.seqNr && message == other.message && ack == other.ack && confirmationQualifier == other
|
||||
.confirmationQualifier && timestampMillis == other.timestampMillis
|
||||
case _ => false
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -711,7 +711,8 @@ private class ProducerControllerImpl[A: ClassTag](
|
|||
}
|
||||
|
||||
def receiveSendChunk(): Behavior[InternalCommand] = {
|
||||
if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s.storeMessageSentInProgress == 0) {
|
||||
if (s.remainingChunks.nonEmpty && s.remainingChunks.head.seqNr <= s.requestedSeqNr && s
|
||||
.storeMessageSentInProgress == 0) {
|
||||
if (traceEnabled)
|
||||
context.log.trace("Send next chunk seqNr [{}].", s.remainingChunks.head.seqNr)
|
||||
if (durableQueue.isEmpty) {
|
||||
|
|
|
|||
|
|
@ -194,7 +194,8 @@ private[pekko] class Mailboxes(
|
|||
|
||||
if (deploy.mailbox != Deploy.NoMailboxGiven) {
|
||||
verifyRequirements(lookup(deploy.mailbox))
|
||||
} else if (deploy.dispatcher != Deploy.NoDispatcherGiven && deploy.dispatcher != Deploy.DispatcherSameAsParent && hasMailboxType) {
|
||||
} else if (deploy.dispatcher != Deploy.NoDispatcherGiven && deploy.dispatcher != Deploy
|
||||
.DispatcherSameAsParent && hasMailboxType) {
|
||||
verifyRequirements(lookup(dispatcherConfig.getString("id")))
|
||||
} else if (hasRequiredType(actorClass)) {
|
||||
try verifyRequirements(lookupByQueueType(getRequiredType(actorClass)))
|
||||
|
|
|
|||
|
|
@ -103,7 +103,8 @@ class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingB
|
|||
Logging.Debug(
|
||||
simpleName(this),
|
||||
this.getClass,
|
||||
"initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers.size + " initial subscribers with it"))
|
||||
"initialized unsubscriber to: " + unsubscriber + ", registering " + subscribers
|
||||
.size + " initial subscribers with it"))
|
||||
subscribers.foreach(registerWithUnsubscriber)
|
||||
true
|
||||
} else {
|
||||
|
|
|
|||
|
|
@ -252,7 +252,8 @@ private[pekko] object AsyncDnsResolver {
|
|||
"""^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$""".r
|
||||
|
||||
private val ipv6Address =
|
||||
"""^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$""".r
|
||||
"""^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$"""
|
||||
.r
|
||||
|
||||
private[pekko] def isIpv4Address(name: String): Boolean =
|
||||
ipv4Address.findAllMatchIn(name).nonEmpty
|
||||
|
|
|
|||
|
|
@ -70,16 +70,9 @@ class ClusterMetricsExtensionSpec
|
|||
val history = metricsView.metricsHistory.reverse.map { _.head }
|
||||
|
||||
val expected = List(
|
||||
(0.700, 0.000, 0.000),
|
||||
(0.700, 0.018, 0.007),
|
||||
(0.700, 0.051, 0.020),
|
||||
(0.700, 0.096, 0.038),
|
||||
(0.700, 0.151, 0.060),
|
||||
(0.700, 0.214, 0.085),
|
||||
(0.700, 0.266, 0.106),
|
||||
(0.700, 0.309, 0.123),
|
||||
(0.700, 0.343, 0.137),
|
||||
(0.700, 0.372, 0.148))
|
||||
(0.700, 0.000, 0.000), (0.700, 0.018, 0.007), (0.700, 0.051, 0.020), (0.700, 0.096, 0.038),
|
||||
(0.700, 0.151, 0.060), (0.700, 0.214, 0.085), (0.700, 0.266, 0.106), (0.700, 0.309, 0.123),
|
||||
(0.700, 0.343, 0.137), (0.700, 0.372, 0.148))
|
||||
|
||||
expected.size should ===(sampleCount)
|
||||
|
||||
|
|
|
|||
|
|
@ -504,7 +504,8 @@ private class ShardingProducerControllerImpl[A: ClassTag](
|
|||
s.out.flatMap {
|
||||
case (outKey: OutKey, outState) =>
|
||||
val idleDurationMillis = (now - outState.usedNanoTime) / 1000 / 1000
|
||||
if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty && idleDurationMillis >= settings.cleanupUnusedAfter.toMillis) {
|
||||
if (outState.unconfirmed.isEmpty && outState.buffered.isEmpty && idleDurationMillis >= settings
|
||||
.cleanupUnusedAfter.toMillis) {
|
||||
context.log.debug("Cleanup unused [{}], because it was idle for [{} ms]", outKey, idleDurationMillis)
|
||||
context.stop(outState.producerController)
|
||||
Some(outKey)
|
||||
|
|
|
|||
|
|
@ -1651,7 +1651,8 @@ private[pekko] class DDataShardCoordinator(
|
|||
updateStateRetries += 1
|
||||
|
||||
val template =
|
||||
s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency.timeout.toMillis} millis (${if (terminating) "terminating"
|
||||
s"$typeName: The ShardCoordinator was unable to update a distributed state within 'updating-state-timeout': ${stateWriteConsistency
|
||||
.timeout.toMillis} millis (${if (terminating) "terminating"
|
||||
else "retrying"}). Attempt $updateStateRetries. " +
|
||||
s"Perhaps the ShardRegion has not started on all active nodes yet? event=$evt"
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,8 @@ import pekko.util.Timeout
|
|||
|
||||
def withState(
|
||||
subscribeAdapters: Map[
|
||||
ActorRef[JReplicator.SubscribeResponse[ReplicatedData]], ActorRef[dd.Replicator.SubscribeResponse[
|
||||
ActorRef[JReplicator.SubscribeResponse[ReplicatedData]],
|
||||
ActorRef[dd.Replicator.SubscribeResponse[
|
||||
ReplicatedData]]]): Behavior[SReplicator.Command] = {
|
||||
|
||||
def stopSubscribeAdapter(
|
||||
|
|
|
|||
|
|
@ -882,7 +882,8 @@ private[cluster] class ClusterCoreDaemon(publisher: ActorRef, joinConfigCompatCh
|
|||
def leaving(address: Address): Unit = {
|
||||
// only try to update if the node is available (in the member ring)
|
||||
latestGossip.members.find(_.address == address).foreach { existingMember =>
|
||||
if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember.status == Up || existingMember.status == PreparingForShutdown || existingMember.status == ReadyForShutdown) {
|
||||
if (existingMember.status == Joining || existingMember.status == WeaklyUp || existingMember
|
||||
.status == Up || existingMember.status == PreparingForShutdown || existingMember.status == ReadyForShutdown) {
|
||||
// mark node as LEAVING
|
||||
val newMembers = latestGossip.members - existingMember + existingMember.copy(status = Leaving)
|
||||
val newGossip = latestGossip.copy(members = newMembers)
|
||||
|
|
|
|||
|
|
@ -122,7 +122,8 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)
|
|||
override def currentChanges(tag: String, offset: Offset): Source[DurableStateChange[A], pekko.NotUsed] =
|
||||
this.synchronized {
|
||||
val currentGlobalOffset = lastGlobalOffset.get()
|
||||
changes(tag, offset).takeWhile(_.offset match {
|
||||
changes(tag, offset).takeWhile(
|
||||
_.offset match {
|
||||
case Sequence(fromOffset) =>
|
||||
fromOffset < currentGlobalOffset
|
||||
case offset =>
|
||||
|
|
@ -137,7 +138,8 @@ class PersistenceTestKitDurableStateStore[A](val system: ExtendedActorSystem)
|
|||
offset: Offset): Source[DurableStateChange[A], NotUsed] =
|
||||
this.synchronized {
|
||||
val currentGlobalOffset = lastGlobalOffset.get()
|
||||
changesBySlices(entityType, minSlice, maxSlice, offset).takeWhile(_.offset match {
|
||||
changesBySlices(entityType, minSlice, maxSlice, offset).takeWhile(
|
||||
_.offset match {
|
||||
case Sequence(fromOffset) =>
|
||||
fromOffset < currentGlobalOffset
|
||||
case offset =>
|
||||
|
|
|
|||
|
|
@ -138,7 +138,8 @@ private[pekko] class ReplayFilter(
|
|||
if (msg.persistent.sequenceNr >= seqNo) {
|
||||
val errMsg =
|
||||
s"Invalid replayed event [sequenceNr=${r.persistent.sequenceNr}, writerUUID=${r.persistent.writerUuid}] from a new writer. " +
|
||||
s"An older writer already sent an event [sequenceNr=${msg.persistent.sequenceNr}, writerUUID=${msg.persistent.writerUuid}] whose sequence number was equal or greater for the same persistenceId [${r.persistent.persistenceId}]. " +
|
||||
s"An older writer already sent an event [sequenceNr=${msg.persistent.sequenceNr}, writerUUID=${msg.persistent
|
||||
.writerUuid}] whose sequence number was equal or greater for the same persistenceId [${r.persistent.persistenceId}]. " +
|
||||
"Perhaps, the new writer journaled the event out of sequence, or duplicate persistenceId for different entities?"
|
||||
logIssue(errMsg)
|
||||
mode match {
|
||||
|
|
|
|||
|
|
@ -248,7 +248,8 @@ object MultiJvmPlugin extends AutoPlugin {
|
|||
.foreach(classpathFile =>
|
||||
IO.copyFile(classpathFile, new File(multiRunCopiedClassDir, classpathFile.getName), true))
|
||||
val cp =
|
||||
directoryBasedClasspathEntries.absString + File.pathSeparator + multiRunCopiedClassDir.getAbsolutePath + File.separator + "*"
|
||||
directoryBasedClasspathEntries.absString + File.pathSeparator + multiRunCopiedClassDir.getAbsolutePath + File
|
||||
.separator + "*"
|
||||
(testClass: String) => { Seq("-cp", cp, runner, "-s", testClass) ++ options }
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -187,12 +187,8 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
|||
.map(_.replaceAll("Mat$", ""))
|
||||
.map(method => (element, method))
|
||||
} ++ List(
|
||||
(noElement, "Partition"),
|
||||
(noElement, "MergeSequence"),
|
||||
(noElement, "Broadcast"),
|
||||
(noElement, "Balance"),
|
||||
(noElement, "Unzip"),
|
||||
(noElement, "UnzipWith"))
|
||||
(noElement, "Partition"), (noElement, "MergeSequence"), (noElement, "Broadcast"), (noElement, "Balance"),
|
||||
(noElement, "Unzip"), (noElement, "UnzipWith"))
|
||||
|
||||
val sourceAndFlow =
|
||||
defs.collect { case ("Source", method) => method }.intersect(defs.collect { case ("Flow", method) => method })
|
||||
|
|
|
|||
|
|
@ -912,7 +912,8 @@ private[remote] class EndpointWriter(
|
|||
|
||||
if (pduSize > transport.maximumPayloadBytes) {
|
||||
val reasonText =
|
||||
s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes."
|
||||
s"Discarding oversized payload sent to ${s.recipient}: max allowed size ${transport
|
||||
.maximumPayloadBytes} bytes, actual size of encoded ${s.message.getClass} was ${pdu.size} bytes."
|
||||
log.error(
|
||||
new OversizedPayloadException(reasonText),
|
||||
"Transient association error (association remains live)")
|
||||
|
|
|
|||
|
|
@ -817,8 +817,7 @@ class LinearTraversalBuilderSpec extends PekkoSpec {
|
|||
|
||||
mat.islandAssignments should ===(
|
||||
List(
|
||||
(sink, Attributes.none, TestDefaultIsland),
|
||||
(flow2, Attributes.none, TestDefaultIsland),
|
||||
(sink, Attributes.none, TestDefaultIsland), (flow2, Attributes.none, TestDefaultIsland),
|
||||
(flow1, Attributes.name("island2"), TestIsland2),
|
||||
(source, Attributes.name("island2") and Attributes.name("island1"), TestIsland1)))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -50,7 +50,8 @@ import pekko.util.ByteString
|
|||
val halfClose: Boolean,
|
||||
val idleTimeout: Duration,
|
||||
val bindShutdownTimeout: FiniteDuration)
|
||||
extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection], Future[
|
||||
extends GraphStageWithMaterializedValue[SourceShape[StreamTcp.IncomingConnection],
|
||||
Future[
|
||||
StreamTcp.ServerBinding]] {
|
||||
import ConnectionSourceStage._
|
||||
|
||||
|
|
|
|||
|
|
@ -546,7 +546,8 @@ object Source {
|
|||
*/
|
||||
@deprecated("Use variant accepting completion and failure matchers", "Akka 2.6.0")
|
||||
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
|
||||
new Source(scaladsl.Source.actorRef({
|
||||
new Source(scaladsl.Source.actorRef(
|
||||
{
|
||||
case pekko.actor.Status.Success(s: CompletionStrategy) => s
|
||||
case pekko.actor.Status.Success(_) => CompletionStrategy.Draining
|
||||
case pekko.actor.Status.Success => CompletionStrategy.Draining
|
||||
|
|
|
|||
|
|
@ -710,7 +710,8 @@ object Source {
|
|||
*/
|
||||
@deprecated("Use variant accepting completion and failure matchers instead", "Akka 2.6.0")
|
||||
def actorRef[T](bufferSize: Int, overflowStrategy: OverflowStrategy): Source[T, ActorRef] =
|
||||
actorRef({
|
||||
actorRef(
|
||||
{
|
||||
case pekko.actor.Status.Success(s: CompletionStrategy) => s
|
||||
case pekko.actor.Status.Success(_) => CompletionStrategy.Draining
|
||||
case pekko.actor.Status.Success => CompletionStrategy.Draining
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue