Make use of eq and ne. (#1994)

* chore: make use of eq instead of ==

* chore: make use of ne instead of !=
This commit is contained in:
He-Pin(kerr) 2025-08-03 17:32:32 +08:00 committed by GitHub
parent 19788583ee
commit 7325c729ce
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
84 changed files with 158 additions and 158 deletions

View file

@ -71,7 +71,7 @@ import pekko.testkit.TestKit
}
private def messageOrEmpty(event: LoggingEvent): String =
if (event.message == null) "" else event.message
if (event.message eq null) "" else event.message
private def sourceOrEmpty(event: LoggingEvent): String =
event.mdc.getOrElse("pekkoSource", "")

View file

@ -67,7 +67,7 @@ object SpawnProtocol {
msg match {
case Spawn(bhvr: Behavior[t], name, props, replyTo) =>
val ref =
if (name == null || name.equals(""))
if ((name eq null) || name.equals(""))
ctx.spawnAnonymous(bhvr, props)
else {

View file

@ -285,7 +285,7 @@ import scala.util.Success
future: CompletionStage[Value],
applyToResult: pekko.japi.function.Function2[Value, Throwable, T]): Unit = {
future.handle[Unit] { (value, ex) =>
if (ex != null)
if (ex ne null)
self.unsafeUpcast ! AdaptMessage(ex, applyToResult.apply(null.asInstanceOf[Value], _: Throwable))
else self.unsafeUpcast ! AdaptMessage(value, applyToResult.apply(_: Value, null))
}

View file

@ -66,7 +66,7 @@ import pekko.annotation.InternalApi
}
final case class ActorTagsImpl(tags: Set[String], next: Props = Props.empty) extends ActorTags {
if (tags == null)
if (tags eq null)
throw new IllegalArgumentException("Tags must not be null")
def withNext(next: Props): Props = copy(next = next)
}

View file

@ -144,7 +144,7 @@ import java.util.function.Predicate
override def exists(predicate: T => Boolean): Boolean = {
var hasElement = false
var node = _first
while (node != null && !hasElement) {
while ((node ne null) && !hasElement) {
hasElement = predicate(node.message)
node = node.next
}

View file

@ -403,12 +403,12 @@ object Behaviors {
behavior: Behavior[T]): Behavior[T] = {
def asScalaMap(m: java.util.Map[String, String]): Map[String, String] = {
if (m == null || m.isEmpty) Map.empty[String, String]
if ((m eq null) || m.isEmpty) Map.empty[String, String]
else m.asScala.toMap
}
val mdcForMessageFun: T => Map[String, String] =
if (mdcForMessage == null) _ => Map.empty
if (mdcForMessage eq null) _ => Map.empty
else { message =>
asScalaMap(mdcForMessage.apply(message))
}

View file

@ -89,7 +89,7 @@ final case class Identify(messageId: Any) extends AutoReceivedMessage with NotIn
*/
@SerialVersionUID(1L)
final case class ActorIdentity(correlationId: Any, ref: Option[ActorRef]) {
if (ref.isDefined && ref.get == null) {
if (ref.isDefined && (ref.get eq null)) {
throw new IllegalArgumentException(
"ActorIdentity created with ref = Some(null) is not allowed, " +
"this could happen when serializing with Scala 2.12 and deserializing with Scala 2.11 which is not supported.")
@ -201,7 +201,7 @@ class ActorInitializationException protected (actor: ActorRef, message: String,
}
object ActorInitializationException {
private def enrichedMessage(actor: ActorRef, message: String) =
if (actor == null) message else s"${actor.path}: $message"
if (actor eq null) message else s"${actor.path}: $message"
private[pekko] def apply(actor: ActorRef, message: String, cause: Throwable = null): ActorInitializationException =
new ActorInitializationException(actor, message, cause)
private[pekko] def apply(message: String): ActorInitializationException =
@ -230,7 +230,7 @@ final case class PreRestartException private[pekko] (
extends ActorInitializationException(
actor,
"exception in preRestart(" +
(if (originalCause == null) "null" else originalCause.getClass) + ", " +
(if (originalCause eq null) "null" else originalCause.getClass) + ", " +
(messageOption match { case Some(m: AnyRef) => m.getClass; case _ => "None" }) +
")",
cause)
@ -247,7 +247,7 @@ final case class PreRestartException private[pekko] (
final case class PostRestartException private[pekko] (actor: ActorRef, cause: Throwable, originalCause: Throwable)
extends ActorInitializationException(
actor,
"exception post restart (" + (if (originalCause == null) "null" else originalCause.getClass) + ")",
"exception post restart (" + (if (originalCause eq null) "null" else originalCause.getClass) + ")",
cause)
/**

View file

@ -641,7 +641,7 @@ private[pekko] class ActorCell(
protected def create(failure: Option[ActorInitializationException]): Unit = {
def failActor(): Unit =
if (_actor != null) {
if (_actor ne null) {
clearActorFields(actor, recreate = false)
_actor = null // ensure that we know that we failed during creation
}
@ -684,7 +684,7 @@ private[pekko] class ActorCell(
@tailrec
private def rootCauseOf(throwable: Throwable): Throwable = {
if (throwable.getCause != null && throwable.getCause != throwable)
if ((throwable.getCause ne null) && throwable.getCause != throwable)
rootCauseOf(throwable.getCause)
else
throwable

View file

@ -188,15 +188,15 @@ object AddressFromURIString {
def unapply(uri: URI): Option[Address] =
if (uri eq null) None
else if (uri.getScheme == null || (uri.getUserInfo == null && uri.getHost == null)) None
else if (uri.getUserInfo == null) { // case 1: pekko://system
else if ((uri.getScheme eq null) || (uri.getUserInfo == null && (uri.getHost eq null))) None
else if (uri.getUserInfo eq null) { // case 1: pekko://system
if (uri.getPort != -1) None
else Some(Address(uri.getScheme, uri.getHost))
} else { // case 2: pekko://system@host:port
if (uri.getHost == null || uri.getPort == -1) None
if ((uri.getHost eq null) || uri.getPort == -1) None
else
Some(
if (uri.getUserInfo == null) Address(uri.getScheme, uri.getHost)
if (uri.getUserInfo eq null) Address(uri.getScheme, uri.getHost)
else Address(uri.getScheme, uri.getUserInfo, uri.getHost, uri.getPort))
}

View file

@ -696,14 +696,14 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
* @see [[#startWith]]
*/
final def initialize(): Unit =
if (currentState != null) makeTransition(currentState)
if (currentState ne null) makeTransition(currentState)
else throw new IllegalStateException("You must call `startWith` before calling `initialize`")
/**
* Return current state name (i.e. object of type S)
*/
final def stateName: S = {
if (currentState != null) currentState.stateName
if (currentState ne null) currentState.stateName
else throw new IllegalStateException("You must call `startWith` before using `stateName`")
}
@ -711,7 +711,7 @@ trait FSM[S, D] extends Actor with Listeners with ActorLogging {
* Return current state data (i.e. object of type D)
*/
final def stateData: D =
if (currentState != null) currentState.stateData
if (currentState ne null) currentState.stateData
else throw new IllegalStateException("You must call `startWith` before using `stateData`")
/**

View file

@ -144,7 +144,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
try {
runnable.run()
val driftNanos = clock() - getAndAdd(delay.toNanos)
if (self.get() != null)
if (self.get() ne null)
swap(schedule(executor, this, Duration.fromNanos(Math.max(delay.toNanos - driftNanos, 1))))
} catch {
case _: SchedulerException => // ignore failure to enqueue or terminated target actor
@ -204,10 +204,10 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
private def schedule(ec: ExecutionContext, r: Runnable, delay: FiniteDuration): TimerTask =
if (delay.length <= 0L) { // use simple comparison instead of Ordering for performance
if (stopped.get != null) throw SchedulerException("cannot enqueue after timer shutdown")
if (stopped.get ne null) throw SchedulerException("cannot enqueue after timer shutdown")
ec.execute(r)
NotCancellable
} else if (stopped.get != null) {
} else if (stopped.get ne null) {
throw SchedulerException("cannot enqueue after timer shutdown")
} else {
val delayNanos = delay.toNanos
@ -216,7 +216,7 @@ class LightArrayRevolverScheduler(config: Config, log: LoggingAdapter, threadFac
val ticks = (delayNanos / tickNanos).toInt
val task = new TaskHolder(r, ticks, ec)
queue.add(task)
if (stopped.get != null && task.cancel())
if ((stopped.get ne null) && task.cancel())
throw SchedulerException("cannot enqueue after timer shutdown")
task
}

View file

@ -89,12 +89,12 @@ trait Scheduler {
override def run(): Unit = {
try {
runnable.run()
if (get != null)
if (get ne null)
swap(scheduleOnce(delay, this))
} catch {
// ignore failure to enqueue or terminated target actor
case _: SchedulerException =>
case e: IllegalStateException if e.getCause != null && e.getCause.isInstanceOf[SchedulerException] =>
case _: SchedulerException =>
case e: IllegalStateException if (e.getCause ne null) && e.getCause.isInstanceOf[SchedulerException] =>
}
}
@ -541,7 +541,7 @@ object Scheduler {
@tailrec final protected def swap(c: Cancellable): Unit = {
get match {
case null => if (c != null) c.cancel()
case null => if (c ne null) c.cancel()
case old =>
if (!compareAndSet(old, c))
swap(c)

View file

@ -93,7 +93,7 @@ private[pekko] trait Dispatch { this: ActorCell =>
val req = system.mailboxes.getRequiredType(actorClass)
if (req.isInstance(mbox.messageQueue)) Create(None)
else {
val gotType = if (mbox.messageQueue == null) "null" else mbox.messageQueue.getClass.getName
val gotType = if (mbox.messageQueue eq null) "null" else mbox.messageQueue.getClass.getName
Create(Some(ActorInitializationException(self, s"Actor [$self] requires mailbox type [$req] got [$gotType]")))
}
case _ => Create(None)

View file

@ -90,7 +90,7 @@ private[pekko] trait FaultHandling { this: ActorCell =>
* Do re-create the actor in response to a failure.
*/
protected def faultRecreate(cause: Throwable): Unit =
if (actor == null) {
if (actor eq null) {
system.eventStream.publish(
Error(self.path.toString, clazz(actor), "changing Recreate into Create after " + cause))
faultCreate()
@ -134,11 +134,11 @@ private[pekko] trait FaultHandling { this: ActorCell =>
* prompted this action.
*/
protected def faultResume(causedByFailure: Throwable): Unit = {
if (actor == null) {
if (actor eq null) {
system.eventStream.publish(
Error(self.path.toString, clazz(actor), "changing Resume into Create after " + causedByFailure))
faultCreate()
} else if (isFailedFatally && causedByFailure != null) {
} else if (isFailedFatally && (causedByFailure ne null)) {
system.eventStream.publish(
Error(self.path.toString, clazz(actor), "changing Resume into Restart after " + causedByFailure))
faultRecreate(causedByFailure)
@ -147,7 +147,7 @@ private[pekko] trait FaultHandling { this: ActorCell =>
// done always to keep that suspend counter balanced
// must happen atomically
try resumeNonRecursive()
finally if (causedByFailure != null) clearFailed()
finally if (causedByFailure ne null) clearFailed()
resumeChildren(causedByFailure, perp)
}
}
@ -331,7 +331,7 @@ private[pekko] trait FaultHandling { this: ActorCell =>
* otherwise tell the supervisor etc. (in that second case, the match
* below will hit the empty default case, too)
*/
if (actor != null) {
if (actor ne null) {
try actor.supervisorStrategy.handleChildTerminated(this, child, children)
catch handleNonFatalOrInterruptedException { e =>
publish(Error(e, self.path.toString, clazz(actor), "handleChildTerminated failed"))

View file

@ -85,7 +85,7 @@ private[dispatch] object VirtualThreadSupport {
val ofVirtualClass = ClassLoader.getSystemClassLoader.loadClass("java.lang.Thread$Builder$OfVirtual")
val ofVirtualMethod = classOf[Thread].getDeclaredMethod("ofVirtual")
var builder = ofVirtualMethod.invoke(null)
if (executor != null) {
if (executor ne null) {
val clazz = builder.getClass
val field = clazz.getDeclaredField("scheduler")
field.setAccessible(true)

View file

@ -49,7 +49,7 @@ class EventStream(sys: ActorSystem, private val debug: Boolean) extends LoggingB
protected def classify(event: Any): Class[_] = event.getClass
protected def publish(event: Any, subscriber: ActorRef) = {
if (sys == null && subscriber.isTerminated) unsubscribe(subscriber)
if ((sys eq null) && subscriber.isTerminated) unsubscribe(subscriber)
else subscriber ! event
}

View file

@ -1647,7 +1647,7 @@ trait DiagnosticLoggingAdapter extends LoggingAdapter {
* These values can be used in PatternLayout when `org.apache.pekko.event.slf4j.Slf4jLogger` is configured.
* Visit <a href="https://logback.qos.ch/manual/mdc.html">Logback Docs: MDC</a> for more information.
*/
def mdc(mdc: MDC): Unit = _mdc = if (mdc != null) mdc else emptyMDC
def mdc(mdc: MDC): Unit = _mdc = if (mdc ne null) mdc else emptyMDC
/**
* Java API:
@ -1675,7 +1675,7 @@ trait DiagnosticLoggingAdapter extends LoggingAdapter {
* These values can be used in PatternLayout when `org.apache.pekko.event.slf4j.Slf4jLogger` is configured.
* Visit <a href="https://logback.qos.ch/manual/mdc.html">Logback Docs: MDC</a> for more information.
*/
def setMDC(jMdc: java.util.Map[String, Any]): Unit = mdc(if (jMdc != null) jMdc.asScala.toMap else emptyMDC)
def setMDC(jMdc: java.util.Map[String, Any]): Unit = mdc(if (jMdc ne null) jMdc.asScala.toMap else emptyMDC)
/**
* Clear all entries in the MDC

View file

@ -57,7 +57,7 @@ private[pekko] class DirectByteBufferPool(defaultBufferSize: Int, maxPoolEntries
}
// allocate new and clear outside the lock
if (buffer == null)
if (buffer eq null)
allocate(defaultBufferSize)
else {
buffer.clear()

View file

@ -484,9 +484,9 @@ object Tcp extends ExtensionId[TcpExt] with ExtensionIdProvider {
_cause
.map(t => {
val msg =
if (t.getCause == null)
if (t.getCause eq null)
t.getMessage
else if (t.getCause.getCause == null)
else if (t.getCause.getCause eq null)
s"${t.getMessage}, caused by: ${t.getCause}"
else
s"${t.getMessage}, caused by: ${t.getCause}, caused by: ${t.getCause.getCause}"

View file

@ -366,7 +366,7 @@ private[io] abstract class TcpConnection(val tcp: TcpExt, val channel: SocketCha
}
@tailrec private[this] def extractMsg(t: Throwable): String =
if (t == null) "unknown"
if (t eq null) "unknown"
else {
t.getMessage match {
case null | "" => extractMsg(t.getCause)

View file

@ -140,7 +140,7 @@ private[io] class TcpListener(
case NonFatal(e) => { log.error(e, "Accept error: could not accept new connection"); null }
}
} else null
if (socketChannel != null) {
if (socketChannel ne null) {
log.debug("New connection accepted")
socketChannel.configureBlocking(false)
def props(registry: ChannelRegistry) =

View file

@ -170,7 +170,7 @@ private[io] class UdpConnection(
}
override def postStop(): Unit =
if (channel != null && channel.isOpen) {
if ((channel ne null) && channel.isOpen) {
log.debug("Closing DatagramChannel after being stopped")
try channel.close()
catch {

View file

@ -222,7 +222,7 @@ object DnsSettings {
val ctx = new InitialDirContext(env)
val dnsUrls = ctx.getEnvironment.get("java.naming.provider.url").asInstanceOf[String]
// Only try if not empty as otherwise we will produce an exception
if (dnsUrls != null && !dnsUrls.isEmpty) {
if ((dnsUrls ne null) && !dnsUrls.isEmpty) {
val servers = dnsUrls.split(" ")
servers.flatMap { server =>
asInetSocketAddress(server).toOption

View file

@ -420,7 +420,7 @@ final class ExplicitlyAskableActorRef(val actorRef: ActorRef) extends AnyVal {
s"question not sent to [$actorRef]."))
case _ =>
val message =
if (sender == null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
if (sender eq null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorRef, message, sender))
}
}
@ -499,7 +499,7 @@ final class ExplicitlyAskableActorSelection(val actorSel: ActorSelection) extend
s"question not sent to [$actorSel]."))
case _ =>
val message =
if (sender == null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
if (sender eq null) null else messageFactory(sender.asInstanceOf[InternalActorRef].provider.deadLetters)
Future.failed[Any](AskableActorRef.unsupportedRecipientType(actorSel, message, sender))
}
}

View file

@ -757,7 +757,7 @@ class CircuitBreaker(
private def isIgnoredException(ex: Any): Boolean =
allowExceptions.nonEmpty && (ex match {
case ce: CompletionException => ce.getCause != null && allowExceptions.contains(ce.getCause.getClass.getName)
case ce: CompletionException => (ce.getCause ne null) && allowExceptions.contains(ce.getCause.getClass.getName)
case _ => allowExceptions.contains(ex.getClass.getName)
})

View file

@ -78,7 +78,7 @@ trait FutureTimeoutSupport {
val future = value
future.handle[Unit]((t: T, ex: Throwable) => {
if (t != null) p.complete(t)
if (ex != null) p.completeExceptionally(ex)
if (ex ne null) p.completeExceptionally(ex)
})
} catch {
case NonFatal(ex) => p.completeExceptionally(ex)
@ -142,7 +142,7 @@ trait FutureTimeoutSupport {
stage.handle[Unit]((v: T, ex: Throwable) => {
timeout.cancel()
if (v != null) p.complete(v)
if (ex != null) p.completeExceptionally(ex)
if (ex ne null) p.completeExceptionally(ex)
})
p
}

View file

@ -57,13 +57,13 @@ trait PipeToSupport {
def pipeTo(recipient: ActorRef)(implicit sender: ActorRef = Actor.noSender): CompletionStage[T] = {
future.whenComplete((t: T, ex: Throwable) => {
if (t != null) recipient ! t
if (ex != null) recipient ! Status.Failure(ex)
if (ex ne null) recipient ! Status.Failure(ex)
})
}
def pipeToSelection(recipient: ActorSelection)(implicit sender: ActorRef = Actor.noSender): CompletionStage[T] = {
future.whenComplete((t: T, ex: Throwable) => {
if (t != null) recipient ! t
if (ex != null) recipient ! Status.Failure(ex)
if (ex ne null) recipient ! Status.Failure(ex)
})
}
def to(recipient: ActorRef): PipeableCompletionStage[T] = to(recipient, Actor.noSender)

View file

@ -35,7 +35,7 @@ import pekko.dispatch.ExecutionContexts
* @tparam T the type of value a successful reply would have
*/
final class StatusReply[+T] private (private val status: Try[T]) {
if (status == null)
if (status eq null)
throw InvalidMessageException("[null] is not an allowed status")
/**
@ -130,7 +130,7 @@ object StatusReply {
*/
def apply[T](value: T): StatusReply[T] = new StatusReply(ScalaSuccess(value))
def unapply(status: StatusReply[Any]): Option[Any] =
if (status != null && status.isSuccess) Some(status.getValue)
if ((status ne null) && status.isSuccess) Some(status.getValue)
else None
}
@ -162,7 +162,7 @@ object StatusReply {
*/
def apply[T](exception: Throwable): StatusReply[T] = new StatusReply(ScalaFailure(exception))
def unapply(status: StatusReply[_]): Option[Throwable] =
if (status != null && status.isError) Some(status.getError)
if ((status ne null) && status.isError) Some(status.getError)
else None
}

View file

@ -177,7 +177,7 @@ final case class ConsistentHashingRoutingLogic(
// If defaultAddress is not available the message will not be routed, but new attempt
// is performed for next message.
val a = ConsistentHashingRoutingLogic.defaultAddress(system)
if (a == null)
if (a eq null)
throw new IllegalStateException("defaultAddress not available yet")
a
}

View file

@ -218,7 +218,7 @@ case class DefaultOptimalSizeExploringResizer(
case ActorRefRoutee(a: ActorRefWithCell) =>
a.underlying match {
case cell: ActorCell =>
cell.mailbox.numberOfMessages + (if (cell.currentMessage != null) 1 else 0)
cell.mailbox.numberOfMessages + (if (cell.currentMessage ne null) 1 else 0)
case cell => cell.numberOfMessages
}
case _ => 0

View file

@ -83,7 +83,7 @@ object Serialization {
catch { case NonFatal(_) => path.toSerializationFormat }
}
case Information(address, system) =>
if (originalSystem == null || originalSystem == system)
if ((originalSystem eq null) || originalSystem == system)
path.toSerializationFormatWithAddress(address)
else {
val provider = originalSystem.provider

View file

@ -119,7 +119,7 @@ private[pekko] object Reflect {
} else null
}
if (constructor == null) error("no matching constructor")
if (constructor eq null) error("no matching constructor")
else constructor
}
@ -135,7 +135,7 @@ private[pekko] object Reflect {
def findMarker(root: Class[_], marker: Class[_]): Type = {
@tailrec def rec(curr: Class[_]): Type = {
if (curr.getSuperclass != null && marker.isAssignableFrom(curr.getSuperclass)) rec(curr.getSuperclass)
if ((curr.getSuperclass ne null) && marker.isAssignableFrom(curr.getSuperclass)) rec(curr.getSuperclass)
else
curr.getGenericInterfaces.collectFirst {
case c: Class[_] if marker.isAssignableFrom(c) => c
@ -159,7 +159,7 @@ private[pekko] object Reflect {
.from(2 /*is the magic number, promise*/ )
.map(get)
.dropWhile { c =>
c != null &&
(c ne null) &&
(c.getName.startsWith("org.apache.pekko.actor.ActorSystem") ||
c.getName.startsWith("scala.Option") ||
c.getName.startsWith("scala.collection.Iterator") ||

View file

@ -68,7 +68,7 @@ class DirectByteBufferPoolBenchmark {
def unpooledHeapAllocAndRelease(): Unit = {
val idx = random.nextInt(unpooledHeapBuffers.length)
val oldBuf = unpooledHeapBuffers(idx)
if (oldBuf != null) DirectByteBufferPool.tryCleanDirectByteBuffer(oldBuf)
if (oldBuf ne null) DirectByteBufferPool.tryCleanDirectByteBuffer(oldBuf)
unpooledHeapBuffers(idx) = ByteBuffer.allocateDirect(size)
}
@ -76,7 +76,7 @@ class DirectByteBufferPoolBenchmark {
def unpooledDirectAllocAndRelease(): Unit = {
val idx = random.nextInt(unpooledDirectBuffers.length)
val oldBuf = unpooledDirectBuffers(idx)
if (oldBuf != null) DirectByteBufferPool.tryCleanDirectByteBuffer(oldBuf)
if (oldBuf ne null) DirectByteBufferPool.tryCleanDirectByteBuffer(oldBuf)
unpooledDirectBuffers(idx) = ByteBuffer.allocateDirect(size)
}
@ -84,7 +84,7 @@ class DirectByteBufferPoolBenchmark {
def pooledDirectAllocAndRelease(): Unit = {
val idx = random.nextInt(pooledDirectBuffers.length)
val oldBuf = pooledDirectBuffers(idx)
if (oldBuf != null) arteryPool.release(oldBuf)
if (oldBuf ne null) arteryPool.release(oldBuf)
pooledDirectBuffers(idx) = arteryPool.acquire()
}

View file

@ -121,7 +121,7 @@ object ClusterShardingSettings {
}
private def option(role: String): Option[String] =
if (role == "" || role == null) None else Option(role)
if (role == "" || (role eq null)) None else Option(role)
sealed trait StateStoreMode { def name: String }

View file

@ -174,7 +174,7 @@ import pekko.util.JavaDurationConverters._
val extractorAdapter = new ExtractorAdapter(extractor)
val extractEntityId: ShardRegion.ExtractEntityId = {
// TODO is it possible to avoid the double evaluation of entityId
case message if extractorAdapter.entityId(message) != null =>
case message if extractorAdapter.entityId(message) ne null =>
(extractorAdapter.entityId(message), extractorAdapter.unwrapMessage(message))
}
val extractShardId: ShardRegion.ExtractShardId = { message =>

View file

@ -1045,7 +1045,7 @@ private[pekko] class Shard(
private def deliverMessage(msg: Any, snd: ActorRef): Unit = {
val (entityId, payload) = extractEntityId(msg)
if (entityId == null || entityId == "") {
if ((entityId eq null) || entityId == "") {
log.warning("{}: Id must not be empty, dropping message [{}]", typeName, msg.getClass.getName)
context.system.deadLetters ! Dropped(msg, "No recipient entity id", snd, self)
} else {

View file

@ -1363,7 +1363,7 @@ private class PersistentShardCoordinator(
state = st.withRememberEntities(settings.rememberEntities)
// Old versions of the state object may not have unallocatedShard set,
// thus it will be null.
if (state.unallocatedShards == null)
if (state.unallocatedShards eq null)
state = state.copy(unallocatedShards = Set.empty)
case RecoveryCompleted =>

View file

@ -1309,7 +1309,7 @@ private[pekko] class ShardRegion(
if (verboseDebug)
log.debug("{}: Forwarding message for shard [{}] to [{}]", typeName, shardId, shardRegionRef)
shardRegionRef.tell(msg, snd)
case None if shardId == null || shardId == "" =>
case None if (shardId eq null) || shardId == "" =>
log.warning("{}: Shard must not be empty, dropping message [{}]", typeName, msg.getClass.getName)
context.system.deadLetters ! msg
case None =>

View file

@ -159,7 +159,7 @@ object DistributedPubSubMediator {
@SerialVersionUID(1L) final case class Put(ref: ActorRef)
@SerialVersionUID(1L) final case class Remove(path: String)
@SerialVersionUID(1L) final case class Subscribe(topic: String, group: Option[String], ref: ActorRef) {
require(topic != null && topic != "", "topic must be defined")
require((topic ne null) && topic != "", "topic must be defined")
/**
* Convenience constructor with `group` None
@ -175,7 +175,7 @@ object DistributedPubSubMediator {
def apply(topic: String, ref: ActorRef) = new Subscribe(topic, ref)
}
@SerialVersionUID(1L) final case class Unsubscribe(topic: String, group: Option[String], ref: ActorRef) {
require(topic != null && topic != "", "topic must be defined")
require((topic ne null) && topic != "", "topic must be defined")
def this(topic: String, ref: ActorRef) = this(topic, None, ref)
def this(topic: String, group: String, ref: ActorRef) = this(topic, Some(group), ref)
}

View file

@ -470,7 +470,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
system.stop(clusterDaemons)
// readView might be null if init fails before it is created
if (readView != null)
if (readView ne null)
readView.close()
closeScheduler()

View file

@ -279,7 +279,7 @@ private[pekko] trait ClusterRouterSettingsBase {
require(totalInstances > 0, "totalInstances of cluster router must be > 0")
require(useRoles != null, "useRoles must be non-null")
require(!useRoles.exists(role => role == null || role.isEmpty), "All roles in useRoles must be non-empty")
require(!useRoles.exists(role => (role eq null) || role.isEmpty), "All roles in useRoles must be non-empty")
}
/**

View file

@ -46,21 +46,21 @@ trait SerializationSupport {
@volatile
private var ser: Serialization = _
def serialization: Serialization = {
if (ser == null) ser = SerializationExtension(system)
if (ser eq null) ser = SerializationExtension(system)
ser
}
@volatile
private var protocol: String = _
def addressProtocol: String = {
if (protocol == null) protocol = system.provider.getDefaultAddress.protocol
if (protocol eq null) protocol = system.provider.getDefaultAddress.protocol
protocol
}
@volatile
private var transportInfo: Serialization.Information = _
def transportInformation: Serialization.Information = {
if (transportInfo == null) {
if (transportInfo eq null) {
val address = system.provider.getDefaultAddress
transportInfo = Serialization.Information(address, system)
}

View file

@ -92,7 +92,7 @@ class BundleDelegatingClassLoader(bundle: Bundle, fallBackClassLoader: ClassLoad
} else {
val wiring = b.adapt(classOf[BundleWiring])
val direct: Set[Bundle] =
if (wiring == null) Set.empty
if (wiring eq null) Set.empty
else {
val requiredWires: List[BundleWire] =
wiring.getRequiredWires(BundleRevision.PACKAGE_NAMESPACE).asScala.toList

View file

@ -55,7 +55,7 @@ final case class Sequence(value: Long) extends Offset with Ordered[Sequence] {
* as the `offset` parameter in a subsequent query.
*/
final case class TimeBasedUUID(value: UUID) extends Offset with Ordered[TimeBasedUUID] {
if (value == null || value.version != 1) {
if ((value eq null) || value.version != 1) {
throw new IllegalArgumentException("UUID " + value + " is not a time-based UUID")
}

View file

@ -567,7 +567,7 @@ private[persistence] trait Eventsourced
*/
def recoveryRunning: Boolean = {
// currentState is null if this is called from constructor
if (currentState == null) true else currentState.recoveryRunning
if (currentState eq null) true else currentState.recoveryRunning
}
/**

View file

@ -199,7 +199,7 @@ object Persistence extends ExtensionId[Persistence] with ExtensionIdProvider {
/** Check for default or missing identity. */
private def isEmpty(text: String) = {
text == null || text.isEmpty
(text eq null) || text.isEmpty
}
}

View file

@ -110,5 +110,5 @@ private[pekko] abstract class PersistencePlugin[ScalaDsl, JavaDsl, T: ClassTag](
}
/** Check for default or missing identity. */
private def isEmpty(text: String): Boolean = text == null || text.length == 0
private def isEmpty(text: String): Boolean = (text eq null) || text.isEmpty
}

View file

@ -394,21 +394,21 @@ trait PersistentFSMBase[S, D, E] extends Actor with Listeners with ActorLogging
*/
@deprecated("Removed from API, called internally", "Akka 2.4.5")
private[pekko] final def initialize(): Unit =
if (currentState != null) makeTransition(currentState)
if (currentState ne null) makeTransition(currentState)
else throw new IllegalStateException("You must call `startWith` before calling `initialize`")
/**
* Return current state name (i.e. object of type S)
*/
final def stateName: S =
if (currentState != null) currentState.stateName
if (currentState ne null) currentState.stateName
else throw new IllegalStateException("You must call `startWith` before using `stateName`")
/**
* Return current state data (i.e. object of type D)
*/
final def stateData: D =
if (currentState != null) currentState.stateData
if (currentState ne null) currentState.stateData
else throw new IllegalStateException("You must call `startWith` before using `stateData`")
/**

View file

@ -80,7 +80,7 @@ class DurableStateStoreRegistry(system: ExtendedActorSystem)
/** Check for default or missing identity. */
private def isEmpty(text: String) = {
text == null || text.isEmpty
(text eq null) || text.isEmpty
}
/**

View file

@ -279,7 +279,7 @@ private[remote] class ReliableDeliverySupervisor(
override val supervisorStrategy = OneForOneStrategy(loggingEnabled = false) {
case _: AssociationProblem => Escalate
case NonFatal(e) =>
val causedBy = if (e.getCause == null) "" else s"Caused by: [${e.getCause.getMessage}]"
val causedBy = if (e.getCause eq null) "" else s"Caused by: [${e.getCause.getMessage}]"
log.warning(
"Association with remote system [{}] has failed, address is now gated for [{}] ms. Reason: [{}] {}",
remoteAddress,

View file

@ -94,7 +94,7 @@ private[pekko] class RemoteSystemDaemon(
@tailrec private def addChildParentNeedsWatch(parent: ActorRef, child: ActorRef): Boolean =
parent2children.get(parent) match {
case null =>
if (parent2children.putIfAbsent(parent, Set(child)) == null) true
if (parent2children.putIfAbsent(parent, Set(child)) eq null) true
else addChildParentNeedsWatch(parent, child)
case children =>
if (parent2children.replace(parent, children, children + child)) false

View file

@ -588,7 +588,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter)
OneForOneStrategy(loggingEnabled = false) {
case InvalidAssociation(localAddress, remoteAddress, reason, disassociationInfo) =>
keepQuarantinedOr(remoteAddress) {
val causedBy = if (reason.getCause == null) "" else s"Caused by: [${reason.getCause.getMessage}]"
val causedBy = if (reason.getCause eq null) "" else s"Caused by: [${reason.getCause.getMessage}]"
log.warning(
"Tried to associate with unreachable remote address [{}]. " +
"Address is now gated for {} ms, all messages to this address will be delivered to dead letters. " +

View file

@ -1026,7 +1026,7 @@ private[remote] class Association(
}
def isConnectException: Boolean =
cause.isInstanceOf[StreamTcpException] && cause.getCause != null && cause.getCause
cause.isInstanceOf[StreamTcpException] && (cause.getCause ne null) && cause.getCause
.isInstanceOf[ConnectException]
if (stoppedIdle) {

View file

@ -768,7 +768,7 @@ private[remote] class DuplicateHandshakeReq(
push(out, currentIterator.next())
} finally {
val buf = envelope.envelopeBuffer
if (buf != null) {
if (buf ne null) {
envelope.releaseEnvelopeBuffer()
bufferPool.release(buf)
}
@ -834,7 +834,7 @@ private[remote] class DuplicateFlush(numberOfLanes: Int, system: ExtendedActorSy
push(out, currentIterator.next())
} finally {
val buf = envelope.envelopeBuffer
if (buf != null) {
if (buf ne null) {
envelope.releaseEnvelopeBuffer()
bufferPool.release(buf)
}

View file

@ -467,12 +467,12 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
.stop()
.map { _ =>
flightRecorder.transportStopped()
if (aeronErrorLogTask != null) {
if (aeronErrorLogTask ne null) {
aeronErrorLogTask.cancel()
flightRecorder.transportAeronErrorLogTaskStopped()
}
if (aeron != null) aeron.close()
if (aeronErrorLog != null) aeronErrorLog.close()
if (aeron ne null) aeron.close()
if (aeronErrorLog ne null) aeronErrorLog.close()
if (mediaDriver.get.isDefined) stopMediaDriver()
Done

View file

@ -203,7 +203,7 @@ private[pekko] class TaskRunner(system: ExtendedActorSystem, val idleCpuLevel: I
case Shutdown =>
running = false
tasks.removeAll() // gc friendly
while (cmdQueue.poll() != null) () // gc friendly
while (cmdQueue.poll() ne null) () // gc friendly
shutdown.trySuccess(Done)
}
}

View file

@ -94,7 +94,7 @@ private[pekko] abstract class AbstractActorRefResolveCache[R <: ActorRef: ClassT
ref match {
case r: RemoteActorRef =>
val cachedAssociation = r.cachedAssociation
if (cachedAssociation != null && cachedAssociation.isRemovedAfterQuarantined())
if ((cachedAssociation ne null) && cachedAssociation.isRemovedAfterQuarantined())
r.cachedAssociation = null
case _ =>
}

View file

@ -39,12 +39,12 @@ private[pekko] class ThrowableSupport(system: ExtendedActorSystem) {
def toProtobufThrowable(t: Throwable): ContainerFormats.Throwable.Builder = {
val b = ContainerFormats.Throwable.newBuilder().setClassName(t.getClass.getName)
if (t.getMessage != null)
if (t.getMessage ne null)
b.setMessage(t.getMessage)
if (t.getCause != null)
if (t.getCause ne null)
b.setCause(payloadSupport.payloadBuilder(t.getCause))
val stackTrace = t.getStackTrace
if (stackTrace != null) {
if (stackTrace ne null) {
var i = 0
while (i < stackTrace.length) {
b.addStackTrace(stackTraceElementBuilder(stackTrace(i)))

View file

@ -584,9 +584,9 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
case _: CancellationException => throw new NettyTransportExceptionNoStack("Connection was cancelled")
case NonFatal(t) =>
val msg =
if (t.getCause == null)
if (t.getCause eq null)
t.getMessage
else if (t.getCause.getCause == null)
else if (t.getCause.getCause eq null)
s"${t.getMessage}, caused by: ${t.getCause}"
else
s"${t.getMessage}, caused by: ${t.getCause}, caused by: ${t.getCause.getCause}"

View file

@ -69,7 +69,7 @@ import pekko.util.OptionVal
}
@tailrec private def isAllowedSpringClass(clazz: Class[_]): Boolean = {
if (clazz == null || clazz.equals(classOf[java.lang.Object]))
if ((clazz eq null) || clazz.equals(classOf[java.lang.Object]))
true
else {
val name = clazz.getSimpleName
@ -88,7 +88,7 @@ import pekko.util.OptionVal
Set(classOf[java.io.Serializable], classOf[java.io.Serializable], classOf[java.lang.Comparable[_]])
def isGZipped(bytes: Array[Byte]): Boolean = {
(bytes != null) && (bytes.length >= 2) &&
(bytes ne null) && (bytes.length >= 2) &&
(bytes(0) == GZIPInputStream.GZIP_MAGIC.toByte) &&
(bytes(1) == (GZIPInputStream.GZIP_MAGIC >> 8).toByte)
}

View file

@ -935,7 +935,7 @@ object TestSubscriber {
}
// if no subscription was obtained yet, we expect it
if (_subscription == null) self.expectSubscription()
if (_subscription eq null) self.expectSubscription()
_subscription.request(Long.MaxValue)
drain()

View file

@ -673,7 +673,7 @@ object Attributes {
* If the name is null or empty the name is ignored, i.e. [[#none]] is returned.
*/
def name(name: String): Attributes =
if (name == null || name.isEmpty) none
if ((name eq null) || name.isEmpty) none
else Attributes(Name(name))
/**

View file

@ -85,7 +85,7 @@ import org.reactivestreams.{ Subscriber, Subscription }
if (element == null) throw elementMustNotBeNullException
final def requireNonNullSubscription(subscription: Subscription): Unit =
if (subscription == null) throw subscriptionMustNotBeNullException
if (subscription eq null) throw subscriptionMustNotBeNullException
sealed trait SpecViolation extends Throwable

View file

@ -158,7 +158,7 @@ import org.reactivestreams.Subscription
}
}
if (s == null) {
if (s eq null) {
val ex = subscriberMustNotBeNullException
try rec(Inert.subscriber)
finally throw ex // must throw NPE, rule 2:13
@ -197,7 +197,7 @@ import org.reactivestreams.Subscription
}
}
if (s == null) {
if (s eq null) {
val ex = subscriptionMustNotBeNullException
try rec(ErrorPublisher(ex, "failed-VirtualProcessor"))
finally throw ex // must throw NPE, rule 2:13
@ -290,10 +290,10 @@ import org.reactivestreams.Subscription
s"VirtualPublisher#$hashCode($other).onError(${ex.getMessage}). spec violation or cancellation race")
}
val ex = if (t == null) exceptionMustNotBeNullException else t
val ex = if (t eq null) exceptionMustNotBeNullException else t
rec(ex)
// must throw NPE, rule 2.13
if (t == null) throw ex
if (t eq null) throw ex
}
@tailrec override def onComplete(): Unit = {

View file

@ -163,7 +163,7 @@ private[pekko] final class UnfoldJava[S, E](s: S, f: function.Function[S, Option
handle(future.getNow(null))
} else {
future.handle((r, ex) => {
if (ex != null) {
if (ex ne null) {
asyncHandler(Failure(ex))
} else {
asyncHandler(Success(r))

View file

@ -249,7 +249,7 @@ import org.reactivestreams.Subscription
} else if (upstreamCompleted) {
// onComplete or onError has been called before OnSubscribe
tryCancel(subscription, SubscriptionWithCancelException.NoMoreElementsNeeded)
} else if (upstream != null) { // reactive streams spec 2.5
} else if (upstream ne null) { // reactive streams spec 2.5
tryCancel(subscription, new IllegalStateException("Publisher can only be subscribed once."))
} else {
upstream = subscription
@ -573,7 +573,7 @@ import org.reactivestreams.Subscription
(logic, event, promise, handler) => {
val asyncInput = AsyncInput(this, logic, event, promise, handler)
val currentInterpreter = GraphInterpreter.currentInterpreterOrNull
if (currentInterpreter == null || (currentInterpreter.context ne self))
if ((currentInterpreter eq null) || (currentInterpreter.context ne self))
self ! asyncInput
else enqueueToShortCircuit(asyncInput)
}, attributes.mandatoryAttribute[ActorAttributes.FuzzingMode].enabled, self)
@ -785,7 +785,7 @@ import org.reactivestreams.Subscription
override def preStart(): Unit = {
tryInit(_initial)
if (activeInterpreters.isEmpty) context.stop(self)
else if (shortCircuitBuffer != null) shortCircuitBatch()
else if (shortCircuitBuffer ne null) shortCircuitBatch()
}
@tailrec private def shortCircuitBatch(): Unit = {
@ -826,11 +826,11 @@ import org.reactivestreams.Subscription
case b: BoundaryEvent =>
currentLimit = eventLimit
processEvent(b)
if (shortCircuitBuffer != null) shortCircuitBatch()
if (shortCircuitBuffer ne null) shortCircuitBatch()
case Resume =>
currentLimit = eventLimit
if (shortCircuitBuffer != null) shortCircuitBatch()
if (shortCircuitBuffer ne null) shortCircuitBatch()
case Snapshot =>
sender() ! StreamSnapshotImpl(

View file

@ -310,7 +310,7 @@ import pekko.stream.stage._
* materializer for the GraphInterpreterfusing is only an optimization.
*/
def init(subMat: Materializer): Unit = {
_subFusingMaterializer = if (subMat == null) materializer else subMat
_subFusingMaterializer = if (subMat eq null) materializer else subMat
var i = 0
while (i < logics.length) {
val logic = logics(i)
@ -376,7 +376,7 @@ import pekko.stream.stage._
chaseCounter = math.min(ChaseLimit, eventsRemaining)
def reportStageError(e: Throwable): Unit = {
if (activeStage == null) throw e
if (activeStage eq null) throw e
else {
val loggingEnabled = activeStage.attributes.get[LogLevels] match {
case Some(levels) => levels.onFailure != LogLevels.Off
@ -603,7 +603,7 @@ import pekko.stream.stage._
}
// Returns true if the given stage is already completed
def isStageCompleted(stage: GraphStageLogic): Boolean = stage != null && shutdownCounter(stage.stageId) == 0
def isStageCompleted(stage: GraphStageLogic): Boolean = (stage ne null) && shutdownCounter(stage.stageId) == 0
// Returns true if the given stage is already finalized
private def isStageFinalized(stage: GraphStageLogic): Boolean = finalizedMark(stage.stageId)
@ -712,7 +712,7 @@ import pekko.stream.stage._
LogicSnapshotImpl(idx, logic.toString, logic.attributes)
}
val logicIndexes = logics.zipWithIndex.map { case (stage, idx) => stage -> idx }.toMap
val connectionSnapshots = connections.filter(_ != null).map { connection =>
val connectionSnapshots = connections.filter(_ ne null).map { connection =>
ConnectionSnapshotImpl(
connection.id,
logicSnapshots(logicIndexes(connection.inOwner)),

View file

@ -50,7 +50,7 @@ private[pekko] final class MapConcat[In, Out](f: In => IterableOnce[Out])
private var currentIterator: Iterator[Out] = _
private def hasNext = currentIterator != null && currentIterator.hasNext
private def hasNext = (currentIterator ne null) && currentIterator.hasNext
override def onPush(): Unit =
try {

View file

@ -2362,7 +2362,7 @@ private[pekko] final class StatefulMapConcat[In, Out](val factory: StatefulMapCo
private var accumulator: StatefulMapConcatAccumulator[In, Out] = factory.accumulator()
private val contextPropagation = ContextPropagation()
private def hasNext = if (currentIterator != null) currentIterator.hasNext else false
private def hasNext = if (currentIterator ne null) currentIterator.hasNext else false
setHandlers(in, out, this)

View file

@ -358,7 +358,7 @@ import pekko.util.ccompat.JavaConverters._
val key = keyFor(elem)
require(key != null, "Key cannot be null")
val substreamSource = activeSubstreamsMap.get(key)
if (substreamSource != null) {
if (substreamSource ne null) {
if (substreamSource.isAvailable) substreamSource.push(elem)
else {
nextElementKey = key
@ -397,7 +397,7 @@ import pekko.util.ccompat.JavaConverters._
override protected def onTimer(timerKey: Any): Unit = {
val substreamSource = activeSubstreamsMap.get(timerKey)
if (substreamSource != null) {
if (substreamSource ne null) {
if (!allowClosedSubstreamRecreation) {
closedSubstreams.add(timerKey)
}

View file

@ -118,7 +118,7 @@ private[pekko] final class InputStreamSource(factory: () => InputStream, chunkSi
private def closeInputStream(): Unit = {
try {
if (inputStream != null)
if (inputStream ne null)
inputStream.close()
} catch {
case NonFatal(ex) =>

View file

@ -87,7 +87,7 @@ private[pekko] final class OutputStreamGraphStage(factory: () => OutputStream, a
override def postStop(): Unit = {
try {
if (outputStream != null) {
if (outputStream ne null) {
outputStream.flush()
outputStream.close()
}

View file

@ -460,7 +460,7 @@ import pekko.util.ByteString
@tailrec
private def runDelegatedTasks(): Unit = {
val task = engine.getDelegatedTask
if (task != null) {
if (task ne null) {
if (tracing) log.debug("running task")
task.run()
runDelegatedTasks()

View file

@ -401,7 +401,7 @@ private[stream] object ConnectionSourceStage {
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
else
connection ! Close
} else if (connection != null) {
} else if (connection ne null) {
// We still read, so we only close the write side
if (writeInProgress)
connectionClosePending = true // will continue when WriteAck is received and writeBuffer drained
@ -414,7 +414,7 @@ private[stream] object ConnectionSourceStage {
}
private def closeConnectionDownstreamFinished(): Unit = {
if (connection == null) {
if (connection eq null) {
// This is an outbound connection for which downstream finished
// before the connection was even established.
// In that case we close the connection as soon as upstream finishes
@ -479,7 +479,7 @@ private[stream] object ConnectionSourceStage {
closeConnectionUpstreamFinished()
override def onUpstreamFailure(ex: Throwable): Unit = {
if (connection != null) {
if (connection ne null) {
if (interpreter.log.isDebugEnabled) {
val msg = "Aborting tcp connection to {} because of upstream failure: {}"

View file

@ -3383,7 +3383,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Flow[In, Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@ -3465,7 +3465,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Flow[In, Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}

View file

@ -412,7 +412,7 @@ object Sink {
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, NotUsed] = {
import pekko.util.ccompat.JavaConverters._
val seq = if (rest != null) rest.asScala.map(_.asScala).toSeq else immutable.Seq()
val seq = if (rest ne null) rest.asScala.map(_.asScala).toSeq else immutable.Seq()
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num => fanOutStrategy.apply(num)))
}
@ -438,7 +438,7 @@ object Sink {
sinks: java.util.List[_ <: Graph[SinkShape[U], M]],
fanOutStrategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], NotUsed]])
: Sink[T, java.util.List[M]] = {
val seq = if (sinks != null) CollectionUtil.toSeq(sinks).collect {
val seq = if (sinks ne null) CollectionUtil.toSeq(sinks).collect {
case sink: Sink[U @unchecked, M @unchecked] => sink.asScala
case other => other
}

View file

@ -652,7 +652,7 @@ object Source {
rest: java.util.List[Source[T, _ <: Any]],
fanInStrategy: function.Function[java.lang.Integer, _ <: Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, NotUsed] = {
val seq = if (rest != null) CollectionUtil.toSeq(rest).map(_.asScala) else immutable.Seq()
val seq = if (rest ne null) CollectionUtil.toSeq(rest).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num => fanInStrategy.apply(num)))
}
@ -677,7 +677,7 @@ object Source {
sources: java.util.List[_ <: Graph[SourceShape[T], M]],
fanInStrategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], NotUsed]])
: Source[U, java.util.List[M]] = {
val seq = if (sources != null) CollectionUtil.toSeq(sources).collect {
val seq = if (sources ne null) CollectionUtil.toSeq(sources).collect {
case source: Source[T @unchecked, M @unchecked] => source.asScala
case other => other
}
@ -689,7 +689,7 @@ object Source {
* Combine the elements of multiple streams into a stream of lists.
*/
def zipN[T](sources: java.util.List[Source[T, _ <: Any]]): Source[java.util.List[T], NotUsed] = {
val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
val seq = if (sources ne null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.zipN(seq).map(_.asJava))
}
@ -699,7 +699,7 @@ object Source {
def zipWithN[T, O](
zipper: function.Function[java.util.List[T], O],
sources: java.util.List[Source[T, _ <: Any]]): Source[O, NotUsed] = {
val seq = if (sources != null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
val seq = if (sources ne null) CollectionUtil.toSeq(sources).map(_.asScala) else immutable.Seq()
new Source(scaladsl.Source.zipWithN[T, O](seq => zipper.apply(seq.asJava))(seq))
}
@ -949,7 +949,7 @@ object Source {
sourcesAndPriorities: java.util.List[Pair[Source[T, _ <: Any], java.lang.Integer]],
eagerComplete: Boolean): javadsl.Source[T, NotUsed] = {
val seq =
if (sourcesAndPriorities != null)
if (sourcesAndPriorities ne null)
CollectionUtil.toSeq(sourcesAndPriorities).map(pair => (pair.first.asScala, pair.second.intValue()))
else
immutable.Seq()
@ -1714,7 +1714,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): javadsl.Source[Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@ -1794,7 +1794,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): javadsl.Source[Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}

View file

@ -2188,7 +2188,7 @@ final class SubFlow[In, Out, Mat](
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubFlow[In, Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@ -2246,7 +2246,7 @@ final class SubFlow[In, Out, Mat](
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubFlow[In, Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}

View file

@ -2159,7 +2159,7 @@ final class SubSource[Out, Mat](
def mergeAll(
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
eagerComplete: Boolean): SubSource[Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}
@ -2218,7 +2218,7 @@ final class SubSource[Out, Mat](
those: java.util.List[_ <: Graph[SourceShape[Out], _ <: Any]],
segmentSize: Int,
eagerClose: Boolean): SubSource[Out, Mat] = {
val seq = if (those != null) CollectionUtil.toSeq(those).collect {
val seq = if (those ne null) CollectionUtil.toSeq(those).collect {
case source: Source[Out @unchecked, _] => source.asScala
case other => other
}

View file

@ -125,7 +125,7 @@ final class Merge[T](val inputPorts: Int, val eagerComplete: Boolean) extends Gr
@tailrec
private def dequeueAndDispatch(): Unit = {
val in = pendingQueue.dequeue()
if (in == null) {
if (in eq null) {
// in is null if we reached the end of the queue
if (upstreamsClosed) completeStage()
} else if (isAvailable(in)) {
@ -417,7 +417,7 @@ final class MergePrioritized[T] private (val priorities: Seq[Int], val eagerComp
var next: Inlet[T] = null
ix = 0
while (ix < in.length && next == null) {
while (ix < in.length && (next eq null)) {
if (isAvailable(in(ix))) {
r -= priorities(ix)
if (r < 0) next = in(ix)

View file

@ -607,7 +607,7 @@ private[pekko] class BroadcastHub[T](startAfterNrOfConsumers: Int, bufferSize: I
tryPull()
case UnRegister(id, previousOffset, finalOffset) =>
if (findAndRemoveConsumer(id, previousOffset) != null)
if (findAndRemoveConsumer(id, previousOffset) ne null)
activeConsumers -= 1
if (activeConsumers == 0) {
if (isClosed(in)) completeStage()

View file

@ -388,7 +388,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* INTERNAL API
*/
private[pekko] def interpreter: GraphInterpreter =
if (_interpreter == null)
if (_interpreter eq null)
throw new IllegalStateException(
"not yet initialized: only setHandler is allowed in GraphStageLogic constructor. To access materializer use Source/Flow/Sink.fromMaterializer factory")
else _interpreter
@ -457,7 +457,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/
final protected def setHandler(in: Inlet[_], handler: InHandler): Unit = {
handlers(in.id) = handler
if (_interpreter != null) _interpreter.setHandler(conn(in), handler)
if (_interpreter ne null) _interpreter.setHandler(conn(in), handler)
}
/**
@ -480,7 +480,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/
final protected def setHandler(out: Outlet[_], handler: OutHandler): Unit = {
handlers(out.id + inCount) = handler
if (_interpreter != null) _interpreter.setHandler(conn(out), handler)
if (_interpreter ne null) _interpreter.setHandler(conn(out), handler)
}
private def conn(in: Inlet[_]): Connection = portToConn(in.id)
@ -1063,7 +1063,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
protected def followUp(): Unit = {
setHandler(out, previous)
andThen()
if (followUps != null) {
if (followUps ne null) {
/*
* If (while executing andThen() callback) handler was changed to new emitting,
@ -1080,7 +1080,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
* If next element is emitting completion and there are some elements after it,
* we to need pass them before completion
*/
if (next.followUps != null) {
if (next.followUps ne null) {
setHandler(out, dequeueHeadAndAddToTail(next))
} else {
complete(out)
@ -1092,7 +1092,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
}
def addFollowUp(e: Emitting[T]): Unit =
if (followUps == null) {
if (followUps eq null) {
followUps = e
followUpsTail = e
} else {
@ -1185,7 +1185,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
override def onUpstreamFailure(ex: Throwable): Unit = if (doFail) failStage(ex)
}
val ph = new PassAlongHandler
if (_interpreter != null) {
if (_interpreter ne null) {
if (isAvailable(from)) emit(to, grab(from), ph)
if (doFinish && isClosed(from)) completeStage()
}
@ -1212,7 +1212,7 @@ abstract class GraphStageLogic private[stream] (val inCount: Int, val outCount:
*/
final def getAsyncCallback[T](handler: T => Unit): AsyncCallback[T] = {
val callback = new ConcurrentAsyncCallback[T](handler)
if (_interpreter != null) callback.onStart()
if (_interpreter ne null) callback.onStart()
else callbacksWaitingForInterpreter = callback :: callbacksWaitingForInterpreter
callback
}

View file

@ -742,7 +742,7 @@ trait TestKitBase {
var elem: AnyRef = queue.peekFirst()
var left = leftNow
while (left.toNanos > 0 && elem == null) {
while (left.toNanos > 0 && (elem eq null)) {
// Use of (left / 2) gives geometric series limited by finish time similar to (1/2)^n limited by 1,
// so it is very precise
Thread.sleep(pollInterval.toMillis min (left / 2).toMillis)

View file

@ -47,7 +47,7 @@ class EventFilter(clazz: Class[_], system: ActorSystem) {
def intercept[T](code: Supplier[T]): T = {
val filter: pekko.testkit.EventFilter =
if (_clazz eq classOf[Logging.Error]) {
if (exceptionType == null) exceptionType = Logging.noCause.getClass
if (exceptionType eq null) exceptionType = Logging.noCause.getClass
new ErrorFilter(exceptionType, source, message, pattern, complete, occurrences)
} else if (_clazz eq classOf[Logging.Warning]) {
new WarningFilter(source, message, pattern, complete, occurrences)