Handle a negative value returned by Math.abs() #25034
This commit is contained in:
parent
1ca612985e
commit
05282b59c9
9 changed files with 10 additions and 10 deletions
|
|
@ -401,7 +401,7 @@ private[akka] final class FairDistributionHashCache( final val config: Config) e
|
||||||
|
|
||||||
override final def create(): QueueSelector = new AtomicReference[ImmutableIntMap](ImmutableIntMap.empty) with QueueSelector {
|
override final def create(): QueueSelector = new AtomicReference[ImmutableIntMap](ImmutableIntMap.empty) with QueueSelector {
|
||||||
override def toString: String = s"FairDistributionHashCache(fairDistributionThreshold = $fairDistributionThreshold)"
|
override def toString: String = s"FairDistributionHashCache(fairDistributionThreshold = $fairDistributionThreshold)"
|
||||||
private[this] final def improve(h: Int): Int = Math.abs(reverseBytes(h * 0x9e3775cd) * 0x9e3775cd) // `sbhash`: In memory of Phil Bagwell.
|
private[this] final def improve(h: Int): Int = 0x7FFFFFFF & (reverseBytes(h * 0x9e3775cd) * 0x9e3775cd) // `sbhash`: In memory of Phil Bagwell.
|
||||||
override final def getQueue(command: Runnable, queues: Int): Int = {
|
override final def getQueue(command: Runnable, queues: Int): Int = {
|
||||||
val runnableHash = command.hashCode()
|
val runnableHash = command.hashCode()
|
||||||
if (fairDistributionThreshold == 0)
|
if (fairDistributionThreshold == 0)
|
||||||
|
|
|
||||||
|
|
@ -603,7 +603,7 @@ private[akka] class DDataShard(
|
||||||
Array.tabulate(numberOfKeys)(i ⇒ ORSetKey[EntityId](s"shard-${typeName}-${shardId}-$i"))
|
Array.tabulate(numberOfKeys)(i ⇒ ORSetKey[EntityId](s"shard-${typeName}-${shardId}-$i"))
|
||||||
|
|
||||||
private def key(entityId: EntityId): ORSetKey[EntityId] = {
|
private def key(entityId: EntityId): ORSetKey[EntityId] = {
|
||||||
val i = (math.abs(entityId.hashCode) % numberOfKeys)
|
val i = (math.abs(entityId.hashCode % numberOfKeys))
|
||||||
stateKeys(i)
|
stateKeys(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1623,7 +1623,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
}
|
}
|
||||||
val chunk = (statusCount % totChunks).toInt
|
val chunk = (statusCount % totChunks).toInt
|
||||||
val status = Status(dataEntries.collect {
|
val status = Status(dataEntries.collect {
|
||||||
case (key, (_, _)) if math.abs(key.hashCode) % totChunks == chunk ⇒ (key, getDigest(key))
|
case (key, (_, _)) if math.abs(key.hashCode % totChunks) == chunk ⇒ (key, getDigest(key))
|
||||||
}, chunk, totChunks)
|
}, chunk, totChunks)
|
||||||
to ! status
|
to ! status
|
||||||
}
|
}
|
||||||
|
|
@ -1651,7 +1651,7 @@ final class Replicator(settings: ReplicatorSettings) extends Actor with ActorLog
|
||||||
val otherKeys = otherDigests.keySet
|
val otherKeys = otherDigests.keySet
|
||||||
val myKeys =
|
val myKeys =
|
||||||
if (totChunks == 1) dataEntries.keySet
|
if (totChunks == 1) dataEntries.keySet
|
||||||
else dataEntries.keysIterator.filter(key ⇒ math.abs(key.hashCode) % totChunks == chunk).toSet
|
else dataEntries.keysIterator.filter(key ⇒ math.abs(key.hashCode % totChunks) == chunk).toSet
|
||||||
val otherMissingKeys = myKeys diff otherKeys
|
val otherMissingKeys = myKeys diff otherKeys
|
||||||
val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements)
|
val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements)
|
||||||
if (keys.nonEmpty) {
|
if (keys.nonEmpty) {
|
||||||
|
|
|
||||||
|
|
@ -158,7 +158,7 @@ public class HubDocTest extends AbstractJavaTest {
|
||||||
RunnableGraph<Source<String, NotUsed>> runnableGraph =
|
RunnableGraph<Source<String, NotUsed>> runnableGraph =
|
||||||
producer.toMat(PartitionHub.of(
|
producer.toMat(PartitionHub.of(
|
||||||
String.class,
|
String.class,
|
||||||
(size, elem) -> Math.abs(elem.hashCode()) % size,
|
(size, elem) -> Math.abs(elem.hashCode() % size),
|
||||||
2, 256), Keep.right());
|
2, 256), Keep.right());
|
||||||
|
|
||||||
// By running/materializing the producer, we get back a Source, which
|
// By running/materializing the producer, we get back a Source, which
|
||||||
|
|
|
||||||
|
|
@ -118,7 +118,7 @@ class HubsDocSpec extends AkkaSpec with CompileOnlySpec {
|
||||||
// value to the left is used)
|
// value to the left is used)
|
||||||
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
|
val runnableGraph: RunnableGraph[Source[String, NotUsed]] =
|
||||||
producer.toMat(PartitionHub.sink(
|
producer.toMat(PartitionHub.sink(
|
||||||
(size, elem) ⇒ math.abs(elem.hashCode) % size,
|
(size, elem) ⇒ math.abs(elem.hashCode % size),
|
||||||
startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
|
startAfterNrOfConsumers = 2, bufferSize = 256))(Keep.right)
|
||||||
|
|
||||||
// By running/materializing the producer, we get back a Source, which
|
// By running/materializing the producer, we get back a Source, which
|
||||||
|
|
|
||||||
|
|
@ -485,7 +485,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
|
||||||
val b = env.originUid
|
val b = env.originUid
|
||||||
val hashA = 23 + a
|
val hashA = 23 + a
|
||||||
val hash: Int = 23 * hashA + java.lang.Long.hashCode(b)
|
val hash: Int = 23 * hashA + java.lang.Long.hashCode(b)
|
||||||
math.abs(hash) % inboundLanes
|
math.abs(hash % inboundLanes)
|
||||||
case OptionVal.None ⇒
|
case OptionVal.None ⇒
|
||||||
// the lane is set by the DuplicateHandshakeReq stage, otherwise 0
|
// the lane is set by the DuplicateHandshakeReq stage, otherwise 0
|
||||||
env.lane
|
env.lane
|
||||||
|
|
|
||||||
|
|
@ -408,7 +408,7 @@ private[remote] class Association(
|
||||||
OrdinaryQueueIndex
|
OrdinaryQueueIndex
|
||||||
} else {
|
} else {
|
||||||
// select lane based on destination, to preserve message order
|
// select lane based on destination, to preserve message order
|
||||||
OrdinaryQueueIndex + (math.abs(r.path.uid) % outboundLanes)
|
OrdinaryQueueIndex + (math.abs(r.path.uid % outboundLanes))
|
||||||
}
|
}
|
||||||
r.cachedSendQueueIndex = idx
|
r.cachedSendQueueIndex = idx
|
||||||
idx
|
idx
|
||||||
|
|
|
||||||
|
|
@ -172,7 +172,7 @@ object PartitionHub {
|
||||||
* @param partitioner Function that decides where to route an element. The function takes two parameters;
|
* @param partitioner Function that decides where to route an element. The function takes two parameters;
|
||||||
* the first is the number of active consumers and the second is the stream element. The function should
|
* the first is the number of active consumers and the second is the stream element. The function should
|
||||||
* return the index of the selected consumer for the given element, i.e. int greater than or equal to 0
|
* return the index of the selected consumer for the given element, i.e. int greater than or equal to 0
|
||||||
* and less than number of consumers. E.g. `(size, elem) -> Math.abs(elem.hashCode()) % size`. It's also
|
* and less than number of consumers. E.g. `(size, elem) -> Math.abs(elem.hashCode() % size)`. It's also
|
||||||
* possible to use `-1` to drop the element.
|
* possible to use `-1` to drop the element.
|
||||||
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
|
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
|
||||||
* This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
|
* This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
|
||||||
|
|
|
||||||
|
|
@ -789,7 +789,7 @@ object PartitionHub {
|
||||||
* @param partitioner Function that decides where to route an element. The function takes two parameters;
|
* @param partitioner Function that decides where to route an element. The function takes two parameters;
|
||||||
* the first is the number of active consumers and the second is the stream element. The function should
|
* the first is the number of active consumers and the second is the stream element. The function should
|
||||||
* return the index of the selected consumer for the given element, i.e. int greater than or equal to 0
|
* return the index of the selected consumer for the given element, i.e. int greater than or equal to 0
|
||||||
* and less than number of consumers. E.g. `(size, elem) => math.abs(elem.hashCode) % size`. It's also
|
* and less than number of consumers. E.g. `(size, elem) => math.abs(elem.hashCode % size)`. It's also
|
||||||
* possible to use `-1` to drop the element.
|
* possible to use `-1` to drop the element.
|
||||||
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
|
* @param startAfterNrOfConsumers Elements are buffered until this number of consumers have been connected.
|
||||||
* This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
|
* This is only used initially when the stage is starting up, i.e. it is not honored when consumers have
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue