diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala index dc20569925..af353a44dd 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ConsumerControllerImpl.scala @@ -528,7 +528,7 @@ private class ConsumerControllerImpl[A] private ( reverseCollectedChunks.foreach { seqMsg => builder ++= seqMsg.message.asInstanceOf[ChunkedMessage].serialized } - val bytes = builder.result().toArray + val bytes = builder.result().toArrayUnsafe() val head = collectedChunks.head // this is the last chunk val headMessage = head.message.asInstanceOf[ChunkedMessage] // serialization exceptions are thrown, because it will anyway be stuck with same error if retried and diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala index 85edfa6de6..1716a8e56e 100644 --- a/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/delivery/internal/ProducerControllerImpl.scala @@ -346,10 +346,10 @@ object ProducerControllerImpl { val manifest = Serializers.manifestFor(ser, mAnyRef) val serializerId = ser.identifier if (bytes.length <= chunkSize) { - ChunkedMessage(ByteString(bytes), firstChunk = true, lastChunk = true, serializerId, manifest) :: Nil + ChunkedMessage(ByteString.fromArrayUnsafe(bytes), firstChunk = true, lastChunk = true, serializerId, manifest) :: Nil } else { val builder = Vector.newBuilder[ChunkedMessage] - val chunksIter = ByteString(bytes).grouped(chunkSize) + val chunksIter = ByteString.fromArrayUnsafe(bytes).grouped(chunkSize) var first = true while (chunksIter.hasNext) { val chunk = chunksIter.next() diff --git a/akka-actor/src/main/scala-2.12/akka/util/ByteString.scala b/akka-actor/src/main/scala-2.12/akka/util/ByteString.scala index a155b455e1..8073c0b73b 100644 --- a/akka-actor/src/main/scala-2.12/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala-2.12/akka/util/ByteString.scala @@ -154,7 +154,7 @@ object ByteString { } private[akka] object ByteString1C extends Companion { - def fromString(s: String): ByteString1C = new ByteString1C(s.getBytes) + def fromString(s: String): ByteString1C = new ByteString1C(s.getBytes(StandardCharsets.UTF_8)) def apply(bytes: Array[Byte]): ByteString1C = new ByteString1C(bytes) val SerializationIdentity = 1.toByte @@ -264,7 +264,7 @@ object ByteString { /** INTERNAL API: ByteString backed by exactly one array, with start / end markers */ private[akka] object ByteString1 extends Companion { val empty: ByteString1 = new ByteString1(Array.empty[Byte]) - def fromString(s: String): ByteString1 = apply(s.getBytes) + def fromString(s: String): ByteString1 = apply(s.getBytes(StandardCharsets.UTF_8)) def apply(bytes: Array[Byte]): ByteString1 = apply(bytes, 0, bytes.length) def apply(bytes: Array[Byte], startIndex: Int, length: Int): ByteString1 = if (length == 0) empty @@ -803,8 +803,8 @@ sealed abstract class ByteString extends IndexedSeq[Byte] with IndexedSeqOptimiz * into it before returning it. * * This method of exposing the bytes of a ByteString can save one array - * copy and allocation in the happy path scenario and which can lead to better performance, - * however it also means that one MUST NOT modify the returned in array, or unexpected + * copy and allocation in the happy path scenario which can lead to better performance, + * however it also means that one MUST NOT modify the returned array, or unexpected * immutable data structure contract-breaking behavior will manifest itself. * * This API is intended for users who need to pass the byte array to some other API, which will diff --git a/akka-actor/src/main/scala-2.13/akka/util/ByteString.scala b/akka-actor/src/main/scala-2.13/akka/util/ByteString.scala index 9c6344db0e..e4e5643df5 100644 --- a/akka-actor/src/main/scala-2.13/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala-2.13/akka/util/ByteString.scala @@ -158,7 +158,7 @@ object ByteString { private[akka] object ByteString1C extends Companion { val empty = new ByteString1C(Array.emptyByteArray) - def fromString(s: String): ByteString1C = new ByteString1C(s.getBytes) + def fromString(s: String): ByteString1C = new ByteString1C(s.getBytes(StandardCharsets.UTF_8)) def apply(bytes: Array[Byte]): ByteString1C = new ByteString1C(bytes) val SerializationIdentity = 1.toByte @@ -276,7 +276,7 @@ object ByteString { /** INTERNAL API: ByteString backed by exactly one array, with start / end markers */ private[akka] object ByteString1 extends Companion { val empty: ByteString1 = new ByteString1(Array.emptyByteArray, 0, 0) - def fromString(s: String): ByteString1 = apply(s.getBytes) + def fromString(s: String): ByteString1 = apply(s.getBytes(StandardCharsets.UTF_8)) def apply(bytes: Array[Byte]): ByteString1 = apply(bytes, 0, bytes.length) def apply(bytes: Array[Byte], startIndex: Int, length: Int): ByteString1 = if (length == 0) empty @@ -857,8 +857,8 @@ sealed abstract class ByteString * into it before returning it. * * This method of exposing the bytes of a ByteString can save one array - * copy and allocation in the happy path scenario and which can lead to better performance, - * however it also means that one MUST NOT modify the returned in array, or unexpected + * copy and allocation in the happy path scenario which can lead to better performance, + * however it also means that one MUST NOT modify the returned array, or unexpected * immutable data structure contract-breaking behavior will manifest itself. * * This API is intended for users who need to pass the byte array to some other API, which will diff --git a/akka-actor/src/main/scala-3/akka/util/ByteString.scala b/akka-actor/src/main/scala-3/akka/util/ByteString.scala index 8fd11de7ed..a5413f552d 100644 --- a/akka-actor/src/main/scala-3/akka/util/ByteString.scala +++ b/akka-actor/src/main/scala-3/akka/util/ByteString.scala @@ -160,7 +160,7 @@ object ByteString { private[akka] object ByteString1C extends Companion { val empty = new ByteString1C(Array.emptyByteArray) - def fromString(s: String): ByteString1C = new ByteString1C(s.getBytes) + def fromString(s: String): ByteString1C = new ByteString1C(s.getBytes(StandardCharsets.UTF_8)) def apply(bytes: Array[Byte]): ByteString1C = new ByteString1C(bytes) val SerializationIdentity = 1.toByte @@ -277,7 +277,7 @@ object ByteString { /** INTERNAL API: ByteString backed by exactly one array, with start / end markers */ private[akka] object ByteString1 extends Companion { val empty: ByteString1 = new ByteString1(Array.emptyByteArray, 0, 0) - def fromString(s: String): ByteString1 = apply(s.getBytes) + def fromString(s: String): ByteString1 = apply(s.getBytes(StandardCharsets.UTF_8)) def apply(bytes: Array[Byte]): ByteString1 = apply(bytes, 0, bytes.length) def apply(bytes: Array[Byte], startIndex: Int, length: Int): ByteString1 = if (length == 0) empty @@ -857,8 +857,8 @@ sealed abstract class ByteString * into it before returning it. * * This method of exposing the bytes of a ByteString can save one array - * copy and allocation in the happy path scenario and which can lead to better performance, - * however it also means that one MUST NOT modify the returned in array, or unexpected + * copy and allocation in the happy path scenario which can lead to better performance, + * however it also means that one MUST NOT modify the returned array, or unexpected * immutable data structure contract-breaking behavior will manifest itself. * * This API is intended for users who need to pass the byte array to some other API, which will diff --git a/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala index 1972744c4e..edd1e4d812 100644 --- a/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/AsyncSerializer.scala @@ -23,7 +23,9 @@ import akka.event.Logging trait AsyncSerializer { /** - * Serializes the given object into an Array of Byte + * Serializes the given object into an Array of Byte. + * + * Note that the array must not be mutated by the serializer after it has been used to complete the returned `Future`. */ def toBinaryAsync(o: AnyRef): Future[Array[Byte]] diff --git a/akka-actor/src/main/scala/akka/serialization/Serializer.scala b/akka-actor/src/main/scala/akka/serialization/Serializer.scala index c4265f3754..bb9fe3027b 100644 --- a/akka-actor/src/main/scala/akka/serialization/Serializer.scala +++ b/akka-actor/src/main/scala/akka/serialization/Serializer.scala @@ -48,7 +48,9 @@ trait Serializer { def identifier: Int /** - * Serializes the given object into an Array of Byte + * Serializes the given object into an Array of Byte. + * + * Note that the array must not be mutated by the serializer after it has been returned. */ def toBinary(o: AnyRef): Array[Byte] @@ -130,7 +132,9 @@ abstract class SerializerWithStringManifest extends Serializer { def manifest(o: AnyRef): String /** - * Serializes the given object into an Array of Byte + * Serializes the given object into an Array of Byte. + * + * Note that the array must not be mutated by the serializer after it has been returned. */ def toBinary(o: AnyRef): Array[Byte] diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index 695fff7903..10380516bf 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -8,15 +8,14 @@ import java.{ lang => jl } import java.io.{ ByteArrayInputStream, ByteArrayOutputStream, ObjectOutputStream } import java.io.NotSerializableException import java.util.zip.{ GZIPInputStream, GZIPOutputStream } - import scala.annotation.tailrec import scala.collection.immutable - import akka.actor.{ Address, ExtendedActorSystem } import akka.cluster.metrics._ import akka.cluster.metrics.protobuf.msg.{ ClusterMetricsMessages => cm } import akka.dispatch.Dispatchers -import akka.protobufv3.internal.{ ByteString, MessageLite } +import akka.protobufv3.internal.MessageLite +import akka.remote.ByteStringUtils import akka.serialization.{ BaseSerializer, SerializationExtension, SerializerWithStringManifest, Serializers } import akka.util.ClassLoaderObjectInputStream import akka.util.ccompat._ @@ -122,7 +121,9 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS val builder = cm.MetricsSelector.newBuilder() val serializer = serialization.findSerializerFor(selector) - builder.setData(ByteString.copyFrom(serializer.toBinary(selector))).setSerializerId(serializer.identifier) + builder + .setData(ByteStringUtils.toProtoByteStringUnsafe(serializer.toBinary(selector))) + .setSerializerId(serializer.identifier) val manifest = Serializers.manifestFor(serializer, selector) builder.setManifest(manifest) @@ -198,7 +199,10 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS val out = new ObjectOutputStream(bos) out.writeObject(number) out.close() - Number.newBuilder().setType(NumberType.Serialized).setSerialized(ByteString.copyFrom(bos.toByteArray)) + Number + .newBuilder() + .setType(NumberType.Serialized) + .setSerialized(ByteStringUtils.toProtoByteStringUnsafe(bos.toByteArray)) } } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala index e271e96f81..2334db2cc9 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala @@ -8,16 +8,15 @@ import java.io.{ ByteArrayInputStream, ByteArrayOutputStream } import java.io.NotSerializableException import java.util.zip.GZIPInputStream import java.util.zip.GZIPOutputStream - import scala.annotation.tailrec import scala.collection.immutable.TreeMap - import akka.actor.{ Address, ExtendedActorSystem } import akka.actor.ActorRef import akka.cluster.pubsub.DistributedPubSubMediator._ import akka.cluster.pubsub.DistributedPubSubMediator.Internal._ import akka.cluster.pubsub.protobuf.msg.{ DistributedPubSubMessages => dm } import akka.protobufv3.internal.{ ByteString, MessageLite } +import akka.remote.ByteStringUtils import akka.serialization._ import akka.util.ccompat._ import akka.util.ccompat.JavaConverters._ @@ -230,7 +229,7 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor val msgSerializer = serialization.findSerializerFor(m) val builder = dm.Payload .newBuilder() - .setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m))) + .setEnclosedMessage(ByteStringUtils.toProtoByteStringUnsafe(msgSerializer.toBinary(m))) .setSerializerId(msgSerializer.identifier) val ms = Serializers.manifestFor(msgSerializer, m) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala index 58a5c69019..33369be5ba 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/delivery/ReliableDeliverySerializer.scala @@ -5,7 +5,6 @@ package akka.cluster.typed.internal.delivery import java.io.NotSerializableException - import akka.actor.typed.ActorRefResolver import akka.actor.typed.delivery.ConsumerController import akka.actor.typed.delivery.DurableProducerQueue @@ -23,6 +22,7 @@ import akka.serialization.BaseSerializer import akka.serialization.SerializerWithStringManifest import akka.util.ccompat.JavaConverters._ import akka.protobufv3.internal.ByteString +import akka.remote.ByteStringUtils /** * INTERNAL API @@ -97,7 +97,7 @@ import akka.protobufv3.internal.ByteString private def chunkedMessageToProto(chunk: ChunkedMessage): Payload.Builder = { val payloadBuilder = ContainerFormats.Payload.newBuilder() - payloadBuilder.setEnclosedMessage(ByteString.copyFrom(chunk.serialized.toArray)) + payloadBuilder.setEnclosedMessage(ByteStringUtils.toProtoByteStringUnsafe(chunk.serialized.toArrayUnsafe())) payloadBuilder.setMessageManifest(ByteString.copyFromUtf8(chunk.manifest)) payloadBuilder.setSerializerId(chunk.serializerId) payloadBuilder @@ -207,7 +207,7 @@ import akka.protobufv3.internal.ByteString val manifest = if (seqMsg.getMessage.hasMessageManifest) seqMsg.getMessage.getMessageManifest.toStringUtf8 else "" ChunkedMessage( - akka.util.ByteString(seqMsg.getMessage.getEnclosedMessage.toByteArray), + akka.util.ByteString.fromArrayUnsafe(seqMsg.getMessage.getEnclosedMessage.toByteArray), seqMsg.getFirstChunk, seqMsg.getLastChunk, seqMsg.getMessage.getSerializerId, @@ -260,7 +260,7 @@ import akka.protobufv3.internal.ByteString val manifest = if (sent.getMessage.hasMessageManifest) sent.getMessage.getMessageManifest.toStringUtf8 else "" ChunkedMessage( - akka.util.ByteString(sent.getMessage.getEnclosedMessage.toByteArray), + akka.util.ByteString.fromArrayUnsafe(sent.getMessage.getEnclosedMessage.toByteArray), sent.getFirstChunk, sent.getLastChunk, sent.getMessage.getSerializerId, diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index d58241139e..b67358a055 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -6,21 +6,19 @@ package akka.cluster.protobuf import java.io.{ ByteArrayInputStream, ByteArrayOutputStream } import java.util.zip.{ GZIPInputStream, GZIPOutputStream } - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration.Deadline - import scala.annotation.nowarn import com.typesafe.config.{ Config, ConfigFactory, ConfigRenderOptions } - import akka.actor.{ Address, ExtendedActorSystem } import akka.annotation.InternalApi import akka.cluster._ import akka.cluster.InternalClusterAction._ import akka.cluster.protobuf.msg.{ ClusterMessages => cm } import akka.cluster.routing.{ ClusterRouterPool, ClusterRouterPoolSettings } -import akka.protobufv3.internal.{ ByteString, MessageLite } +import akka.protobufv3.internal.MessageLite +import akka.remote.ByteStringUtils import akka.routing.Pool import akka.serialization._ import akka.util.Version @@ -228,7 +226,9 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) private def poolToProto(pool: Pool): cm.Pool = { val builder = cm.Pool.newBuilder() val serializer = serialization.findSerializerFor(pool) - builder.setSerializerId(serializer.identifier).setData(ByteString.copyFrom(serializer.toBinary(pool))) + builder + .setSerializerId(serializer.identifier) + .setData(ByteStringUtils.toProtoByteStringUnsafe(serializer.toBinary(pool))) val manifest = Serializers.manifestFor(serializer, pool) builder.setManifest(manifest) builder.build() @@ -519,7 +519,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) .newBuilder() .setFrom(uniqueAddressToProto(envelope.from)) .setTo(uniqueAddressToProto(envelope.to)) - .setSerializedGossip(ByteString.copyFrom(compress(gossipToProto(envelope.gossip).build))) + .setSerializedGossip(ByteStringUtils.toProtoByteStringUnsafe(compress(gossipToProto(envelope.gossip).build))) .build private def gossipStatusToProto(status: GossipStatus): cm.GossipStatus = { @@ -530,7 +530,7 @@ final class ClusterMessageSerializer(val system: ExtendedActorSystem) .setFrom(uniqueAddressToProto(status.from)) .addAllAllHashes(allHashes.asJava) .setVersion(vectorClockToProto(status.version, hashMapping)) - .setSeenDigest(ByteString.copyFrom(status.seenDigest)) + .setSeenDigest(ByteStringUtils.toProtoByteStringUnsafe(status.seenDigest)) .build() } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala index 7c1e3ddf4a..6f0c1767fe 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatedDataSerializer.scala @@ -10,12 +10,9 @@ import java.util import java.util.ArrayList import java.util.Collections import java.util.Comparator - import scala.annotation.tailrec import scala.collection.immutable - import scala.annotation.nowarn - import akka.actor.ActorRef import akka.actor.ExtendedActorSystem import akka.cluster.ddata._ @@ -23,8 +20,8 @@ import akka.cluster.ddata.Replicator.Internal._ import akka.cluster.ddata.protobuf.msg.{ ReplicatedDataMessages => rd } import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages => dm } import akka.cluster.ddata.protobuf.msg.ReplicatorMessages.OtherMessage -import akka.protobufv3.internal.ByteString import akka.protobufv3.internal.GeneratedMessageV3 +import akka.remote.ByteStringUtils import akka.serialization.BaseSerializer import akka.serialization.Serialization import akka.serialization.SerializerWithStringManifest @@ -624,7 +621,7 @@ class ReplicatedDataSerializer(val system: ExtendedActorSystem) rd.GCounter.Entry .newBuilder() .setNode(uniqueAddressToProto(address)) - .setValue(ByteString.copyFrom(value.toByteArray))) + .setValue(ByteStringUtils.toProtoByteStringUnsafe(value.toByteArray))) } b.build() } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala index 22f47a9155..5712dde06f 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala @@ -7,13 +7,11 @@ package akka.cluster.ddata.protobuf import java.io.NotSerializableException import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger - import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.duration.Duration import scala.concurrent.duration.FiniteDuration - import akka.actor.Address import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi @@ -28,7 +26,7 @@ import akka.cluster.ddata.Replicator._ import akka.cluster.ddata.Replicator.Internal._ import akka.cluster.ddata.VersionVector import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages => dm } -import akka.protobufv3.internal.ByteString +import akka.remote.ByteStringUtils import akka.serialization.BaseSerializer import akka.serialization.Serialization import akka.serialization.SerializerWithStringManifest @@ -267,7 +265,11 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) b.setChunk(status.chunk).setTotChunks(status.totChunks) status.digests.foreach { case (key, digest) => - b.addEntries(dm.Status.Entry.newBuilder().setKey(key).setDigest(ByteString.copyFrom(digest.toArray))) + b.addEntries( + dm.Status.Entry + .newBuilder() + .setKey(key) + .setDigest(ByteStringUtils.toProtoByteStringUnsafe(digest.toArrayUnsafe()))) } status.toSystemUid.foreach(b.setToSystemUid) // can be None when sending back to a node of version 2.5.21 b.setFromSystemUid(status.fromSystemUid.get) @@ -279,7 +281,9 @@ class ReplicatorMessageSerializer(val system: ExtendedActorSystem) val toSystemUid = if (status.hasToSystemUid) Some(status.getToSystemUid) else None val fromSystemUid = if (status.hasFromSystemUid) Some(status.getFromSystemUid) else None Status( - status.getEntriesList.asScala.iterator.map(e => e.getKey -> AkkaByteString(e.getDigest.toByteArray())).toMap, + status.getEntriesList.asScala.iterator + .map(e => e.getKey -> AkkaByteString.fromArrayUnsafe(e.getDigest.toByteArray())) + .toMap, status.getChunk, status.getTotChunks, toSystemUid, diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala index 01bb5baa34..c29601fc1a 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala @@ -8,10 +8,8 @@ import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.util.zip.GZIPInputStream import java.util.zip.GZIPOutputStream - import scala.annotation.tailrec import scala.collection.immutable.TreeMap - import akka.actor.ActorRef import akka.actor.Address import akka.actor.ExtendedActorSystem @@ -20,6 +18,7 @@ import akka.cluster.ddata.VersionVector import akka.cluster.ddata.protobuf.msg.{ ReplicatorMessages => dm } import akka.protobufv3.internal.ByteString import akka.protobufv3.internal.MessageLite +import akka.remote.ByteStringUtils import akka.serialization._ import akka.util.ccompat._ import akka.util.ccompat.JavaConverters._ @@ -143,7 +142,7 @@ trait SerializationSupport { val msgSerializer = serialization.findSerializerFor(m) val builder = dm.OtherMessage .newBuilder() - .setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(m))) + .setEnclosedMessage(ByteStringUtils.toProtoByteStringUnsafe(msgSerializer.toBinary(m))) .setSerializerId(msgSerializer.identifier) val ms = Serializers.manifestFor(msgSerializer, m) diff --git a/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala b/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala index 5e865e2bfa..038640bbf6 100644 --- a/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala +++ b/akka-persistence-typed/src/main/scala/akka/persistence/typed/serialization/ReplicatedEventSourcingSerializer.scala @@ -7,7 +7,6 @@ package akka.persistence.typed.serialization import java.io.NotSerializableException import java.util.{ ArrayList, Collections, Comparator } import java.{ lang => jl } - import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi import akka.persistence.typed.PersistenceId @@ -18,7 +17,7 @@ import akka.persistence.typed.internal.ReplicatedEventMetadata import akka.persistence.typed.internal.ReplicatedSnapshotMetadata import akka.persistence.typed.internal.ReplicatedPublishedEventMetaData import akka.persistence.typed.internal.VersionVector -import akka.protobufv3.internal.ByteString +import akka.remote.ByteStringUtils import akka.remote.ContainerFormats.Payload import akka.remote.serialization.WrappedPayloadSupport import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } @@ -192,14 +191,14 @@ import scala.collection.immutable.TreeMap def counterToProtoByteArray(counter: Counter): Array[Byte] = ReplicatedEventSourcing.Counter .newBuilder() - .setValue(ByteString.copyFrom(counter.value.toByteArray)) + .setValue(ByteStringUtils.toProtoByteStringUnsafe(counter.value.toByteArray)) .build() .toByteArray def counterUpdatedToProtoBufByteArray(updated: Counter.Updated): Array[Byte] = ReplicatedEventSourcing.CounterUpdate .newBuilder() - .setDelta(ByteString.copyFrom(updated.delta.toByteArray)) + .setDelta(ByteStringUtils.toProtoByteStringUnsafe(updated.delta.toByteArray)) .build() .toByteArray diff --git a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala index 5983bf5acc..0ed2463d7c 100644 --- a/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala +++ b/akka-persistence/src/main/scala/akka/persistence/serialization/MessageSerializer.scala @@ -5,14 +5,11 @@ package akka.persistence.serialization import java.io.NotSerializableException - import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.duration import scala.concurrent.duration.Duration - import scala.annotation.nowarn - import akka.actor.{ ActorPath, ExtendedActorSystem } import akka.actor.Actor import akka.persistence._ @@ -20,6 +17,7 @@ import akka.persistence.AtLeastOnceDelivery._ import akka.persistence.fsm.PersistentFSM.{ PersistentFSMSnapshot, StateChangeEvent } import akka.persistence.serialization.{ MessageFormats => mf } import akka.protobufv3.internal.ByteString +import akka.protobufv3.internal.UnsafeByteOperations import akka.serialization._ import akka.util.ccompat._ @@ -187,7 +185,7 @@ class MessageSerializer(val system: ExtendedActorSystem) extends BaseSerializer val ms = Serializers.manifestFor(serializer, payload) if (ms.nonEmpty) builder.setPayloadManifest(ByteString.copyFromUtf8(ms)) - builder.setPayload(ByteString.copyFrom(serializer.toBinary(payload))) + builder.setPayload(UnsafeByteOperations.unsafeWrap(serializer.toBinary(payload))) builder.setSerializerId(serializer.identifier) builder } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index fdd2a8811d..25c9109da4 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -28,7 +28,7 @@ object MaxThroughputSpec extends MultiNodeConfig { val cfg = ConfigFactory.parseString(s""" # for serious measurements you should increase the totalMessagesFactor (80) - akka.test.MaxThroughputSpec.totalMessagesFactor = 10.0 + akka.test.MaxThroughputSpec.totalMessagesFactor = 160.0 akka.test.MaxThroughputSpec.real-message = off akka.test.MaxThroughputSpec.actor-selection = off akka { diff --git a/akka-remote/src/main/scala/akka/remote/ByteStringUtils.scala b/akka-remote/src/main/scala/akka/remote/ByteStringUtils.scala new file mode 100644 index 0000000000..65d39fbdc5 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/ByteStringUtils.scala @@ -0,0 +1,39 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.remote + +import akka.annotation.InternalApi +import akka.protobufv3.internal.UnsafeByteOperations +import akka.util.ByteString +import akka.protobufv3.internal.{ ByteString => ProtoByteString } +import akka.util.ByteString.ByteString1 +import akka.util.ByteString.ByteString1C + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object ByteStringUtils { + def toProtoByteStringUnsafe(bytes: ByteString): ProtoByteString = { + if (bytes.isEmpty) + ProtoByteString.EMPTY + else if (bytes.isInstanceOf[ByteString1C] || (bytes.isInstanceOf[ByteString1] && bytes.isCompact)) { + UnsafeByteOperations.unsafeWrap(bytes.toArrayUnsafe()) + } else { + // zero copy, reuse the same underlying byte arrays + bytes.asByteBuffers.foldLeft(ProtoByteString.EMPTY) { (acc, byteBuffer) => + acc.concat(UnsafeByteOperations.unsafeWrap(byteBuffer)) + } + } + } + + def toProtoByteStringUnsafe(bytes: Array[Byte]): ProtoByteString = { + if (bytes.isEmpty) + ProtoByteString.EMPTY + else { + UnsafeByteOperations.unsafeWrap(bytes) + } + } +} diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 5ef21f7770..f53177663b 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -5,7 +5,6 @@ package akka.remote import scala.util.control.NonFatal - import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi import akka.protobufv3.internal.ByteString @@ -52,7 +51,7 @@ private[akka] object MessageSerializer { if (oldInfo eq null) Serialization.currentTransportInformation.value = system.provider.serializationInformation - builder.setMessage(ByteString.copyFrom(serializer.toBinary(message))) + builder.setMessage(ByteStringUtils.toProtoByteStringUnsafe(serializer.toBinary(message))) builder.setSerializerId(serializer.identifier) val ms = Serializers.manifestFor(serializer, message) diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala index 6fa13f999e..5f7b0bb7d0 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala @@ -41,18 +41,19 @@ import akka.util.ByteString * The streamId` is encoded as 1 byte. */ def encodeConnectionHeader(streamId: Int): ByteString = - Magic ++ ByteString(streamId.toByte) + Magic ++ ByteString.fromArrayUnsafe(Array(streamId.toByte)) /** * Each frame starts with the frame header that contains the length * of the frame. The `frameLength` is encoded as 4 bytes (little endian). */ def encodeFrameHeader(frameLength: Int): ByteString = - ByteString( - (frameLength & 0xff).toByte, - ((frameLength & 0xff00) >> 8).toByte, - ((frameLength & 0xff0000) >> 16).toByte, - ((frameLength & 0xff000000) >> 24).toByte) + ByteString.fromArrayUnsafe( + Array[Byte]( + (frameLength & 0xff).toByte, + ((frameLength & 0xff00) >> 8).toByte, + ((frameLength & 0xff0000) >> 16).toByte, + ((frameLength & 0xff000000) >> 24).toByte)) } /** diff --git a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala index d6a2f6a07d..716225c9bb 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/DaemonMsgCreateSerializer.scala @@ -6,12 +6,12 @@ package akka.remote.serialization import scala.collection.immutable import scala.reflect.ClassTag - import com.typesafe.config.{ Config, ConfigFactory } -import util.{ Failure, Success } +import util.{ Failure, Success } import akka.actor.{ Deploy, ExtendedActorSystem, NoScopeGiven, Props, Scope } import akka.protobufv3.internal.ByteString +import akka.remote.ByteStringUtils import akka.remote.DaemonMsgCreate import akka.remote.WireFormats.{ DaemonMsgCreateData, DeployData, PropsData } import akka.routing.{ NoRouter, RouterConfig } @@ -46,21 +46,21 @@ private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSys val (serId, _, manifest, bytes) = serialize(d.config) builder.setConfigSerializerId(serId) builder.setConfigManifest(manifest) - builder.setConfig(ByteString.copyFrom(bytes)) + builder.setConfig(ByteStringUtils.toProtoByteStringUnsafe(bytes)) } if (d.routerConfig != NoRouter) { val (serId, _, manifest, bytes) = serialize(d.routerConfig) builder.setRouterConfigSerializerId(serId) builder.setRouterConfigManifest(manifest) - builder.setRouterConfig(ByteString.copyFrom(bytes)) + builder.setRouterConfig(ByteStringUtils.toProtoByteStringUnsafe(bytes)) } if (d.scope != NoScopeGiven) { val (serId, _, manifest, bytes) = serialize(d.scope) builder.setScopeSerializerId(serId) builder.setScopeManifest(manifest) - builder.setScope(ByteString.copyFrom(bytes)) + builder.setScope(ByteStringUtils.toProtoByteStringUnsafe(bytes)) } if (d.dispatcher != NoDispatcherGiven) { @@ -76,7 +76,7 @@ private[akka] final class DaemonMsgCreateSerializer(val system: ExtendedActorSys val builder = PropsData.newBuilder.setClazz(props.clazz.getName).setDeploy(deployProto(props.deploy)) props.args.foreach { arg => val (serializerId, hasManifest, manifest, bytes) = serialize(arg) - builder.addArgs(ByteString.copyFrom(bytes)) + builder.addArgs(ByteStringUtils.toProtoByteStringUnsafe(bytes)) builder.addManifests(manifest) builder.addSerializerIds(serializerId) builder.addHasManifest(hasManifest) diff --git a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala index 233b66cbff..ad98426387 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/MessageContainerSerializer.scala @@ -5,7 +5,6 @@ package akka.remote.serialization import scala.collection.immutable - import akka.actor.ActorSelectionMessage import akka.actor.ExtendedActorSystem import akka.actor.SelectChildName @@ -13,6 +12,7 @@ import akka.actor.SelectChildPattern import akka.actor.SelectParent import akka.actor.SelectionPathElement import akka.protobufv3.internal.ByteString +import akka.remote.ByteStringUtils import akka.remote.ContainerFormats import akka.serialization.{ BaseSerializer, SerializationExtension, Serializers } import akka.util.ccompat._ @@ -36,7 +36,7 @@ class MessageContainerSerializer(val system: ExtendedActorSystem) extends BaseSe val message = sel.msg.asInstanceOf[AnyRef] val serializer = serialization.findSerializerFor(message) builder - .setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(message))) + .setEnclosedMessage(ByteStringUtils.toProtoByteStringUnsafe(serializer.toBinary(message))) .setSerializerId(serializer.identifier) .setWildcardFanOut(sel.wildcardFanOut) diff --git a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala index 3917fffb00..4aa65f210f 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/WrappedPayloadSupport.scala @@ -7,6 +7,7 @@ package akka.remote.serialization import akka.actor.ExtendedActorSystem import akka.event.Logging import akka.protobufv3.internal.ByteString +import akka.remote.ByteStringUtils import akka.remote.ContainerFormats import akka.serialization.{ SerializationExtension, Serializers } import akka.serialization.DisabledJavaSerializer @@ -41,14 +42,14 @@ private[akka] class WrappedPayloadSupport(system: ExtendedActorSystem) { notSerializableException.originalMessage) val serializer2 = serialization.findSerializerFor(notSerializableException) builder - .setEnclosedMessage(ByteString.copyFrom(serializer2.toBinary(notSerializableException))) + .setEnclosedMessage(ByteStringUtils.toProtoByteStringUnsafe(serializer2.toBinary(notSerializableException))) .setSerializerId(serializer2.identifier) val manifest = Serializers.manifestFor(serializer2, notSerializableException) if (manifest.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) case _ => builder - .setEnclosedMessage(ByteString.copyFrom(serializer.toBinary(payload))) + .setEnclosedMessage(ByteStringUtils.toProtoByteStringUnsafe(serializer.toBinary(payload))) .setSerializerId(serializer.identifier) val manifest = Serializers.manifestFor(serializer, payload) if (manifest.nonEmpty) builder.setMessageManifest(ByteString.copyFromUtf8(manifest)) diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index ec95a219eb..4ef8a51eaa 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -5,10 +5,8 @@ package akka.remote.transport import scala.annotation.nowarn - import akka.AkkaException import akka.actor.{ ActorRef, Address, AddressFromURIString, InternalActorRef } -import akka.protobufv3.internal.{ ByteString => PByteString } import akka.protobufv3.internal.InvalidProtocolBufferException import akka.remote._ import akka.remote.WireFormats._ @@ -158,19 +156,15 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { envelopeBuilder.setMessage(serializedMessage) ackAndEnvelopeBuilder.setEnvelope(envelopeBuilder) - ByteString.ByteString1C(ackAndEnvelopeBuilder.build.toByteArray) //Reuse Byte Array (naughty!) + ByteString.fromArrayUnsafe(ackAndEnvelopeBuilder.build.toByteArray) } override def constructPureAck(ack: Ack): ByteString = - ByteString.ByteString1C(AckAndEnvelopeContainer.newBuilder.setAck(ackBuilder(ack)).build().toByteArray) //Reuse Byte Array (naughty!) + ByteString.fromArrayUnsafe(AckAndEnvelopeContainer.newBuilder.setAck(ackBuilder(ack)).build().toByteArray) override def constructPayload(payload: ByteString): ByteString = - ByteString.ByteString1C( - AkkaProtocolMessage - .newBuilder() - .setPayload(PByteString.copyFrom(payload.asByteBuffer)) - .build - .toByteArray) //Reuse Byte Array (naughty!) + ByteString.fromArrayUnsafe( + AkkaProtocolMessage.newBuilder().setPayload(ByteStringUtils.toProtoByteStringUnsafe(payload)).build.toByteArray) override def constructAssociate(info: HandshakeInfo): ByteString = { val handshakeInfo = AkkaHandshakeInfo.newBuilder.setOrigin(serializeAddress(info.origin)).setUid(info.uid.toLong) @@ -194,8 +188,8 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { override def decodePdu(raw: ByteString): AkkaPdu = { try { - val pdu = AkkaProtocolMessage.parseFrom(raw.toArray) - if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) + val pdu = AkkaProtocolMessage.parseFrom(raw.toArrayUnsafe()) + if (pdu.hasPayload) Payload(ByteString.fromByteBuffer(pdu.getPayload.asReadOnlyByteBuffer())) else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction) else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) @@ -208,7 +202,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { raw: ByteString, provider: RemoteActorRefProvider, localAddress: Address): (Option[Ack], Option[Message]) = { - val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArray) + val ackAndEnvelope = AckAndEnvelopeContainer.parseFrom(raw.toArrayUnsafe()) val ackOption = if (ackAndEnvelope.hasAck) { import akka.util.ccompat.JavaConverters._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala index 39020f2e1d..db3eaec3ed 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/OutputStreamGraphStage.scala @@ -50,7 +50,7 @@ private[akka] final class OutputStreamGraphStage(factory: () => OutputStream, au override def onPush(): Unit = { val next = grab(in) try { - outputStream.write(next.toArray) + outputStream.write(next.toArrayUnsafe()) if (autoFlush) outputStream.flush() bytesWritten += next.size diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala index b74594ca25..a904232c8e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateCompressor.scala @@ -34,7 +34,7 @@ import akka.util.{ ByteString, ByteStringBuilder } protected def compressWithBuffer(input: ByteString, buffer: Array[Byte]): ByteString = { require(deflater.needsInput()) - deflater.setInput(input.toArray) + deflater.setInput(input.toArrayUnsafe()) drainDeflater(deflater, buffer) } protected def flushWithBuffer(buffer: Array[Byte]): ByteString = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala index 2091b72252..04e61f40e4 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/DeflateDecompressorBase.scala @@ -32,7 +32,7 @@ import akka.util.ByteString abstract class Inflate(noPostProcessing: Boolean) extends ParseStep[ByteString] { override def canWorkWithPartialData = true override def parse(reader: ByteStringParser.ByteReader): ParseResult[ByteString] = { - inflater.setInput(reader.remainingData.toArray) + inflater.setInput(reader.remainingData.toArrayUnsafe()) val read = inflater.inflate(buffer) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala index bc537f33b2..1884abf9cb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipCompressor.scala @@ -26,7 +26,7 @@ import akka.util.ByteString header() ++ super.finishWithBuffer(buffer) ++ trailer() private def updateCrc(input: ByteString): Unit = { - checkSum.update(input.toArray) + checkSum.update(input.toArrayUnsafe()) bytesRead += input.length } private def header(): ByteString = diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala index e7504d98e7..a5cbdb34d5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/compression/GzipDecompressor.scala @@ -64,7 +64,7 @@ import akka.util.ByteString } private def crc16(data: ByteString) = { val crc = new CRC32 - crc.update(data.toArray) + crc.update(data.toArrayUnsafe()) crc.getValue.toInt & 0xFFFF } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala index 6deb9dc409..e03efa0084 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Framing.scala @@ -83,6 +83,7 @@ object Framing { * For example, frame can have a shape like this: `[offset bytes][body size bytes][body bytes][footer bytes]`. * Then computeFrameSize can be used to compute the frame size: `(offset bytes, computed size) => (actual frame size)`. * ''Actual frame size'' must be equal or bigger than sum of `fieldOffset` and `fieldLength`, the operator fails otherwise. + * Must not mutate the given byte array. * */ def lengthField( @@ -415,7 +416,7 @@ object Framing { } else if (buffSize >= minimumChunkSize) { val parsedLength = intDecoder(buffer.iterator.drop(lengthFieldOffset), lengthFieldLength) frameSize = computeFrameSize match { - case Some(f) => f(buffer.take(lengthFieldOffset).toArray, parsedLength) + case Some(f) => f(buffer.take(lengthFieldOffset).toArrayUnsafe(), parsedLength) case None => parsedLength + minimumChunkSize } if (frameSize > maximumFrameLength) { diff --git a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala index 005805467d..0c3f7cbf13 100644 --- a/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala +++ b/akka-stream/src/main/scala/akka/stream/serialization/StreamRefSerializer.scala @@ -7,10 +7,13 @@ package akka.stream.serialization import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi import akka.protobufv3.internal.ByteString +import akka.protobufv3.internal.UnsafeByteOperations import akka.serialization._ import akka.stream.StreamRefMessages import akka.stream.impl.streamref._ +import java.nio.charset.StandardCharsets + /** INTERNAL API */ @InternalApi private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) @@ -86,7 +89,10 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) private def serializeRemoteSinkFailure( d: StreamRefsProtocol.RemoteStreamFailure): StreamRefMessages.RemoteStreamFailure = { - StreamRefMessages.RemoteStreamFailure.newBuilder().setCause(ByteString.copyFrom(d.msg.getBytes)).build() + StreamRefMessages.RemoteStreamFailure + .newBuilder() + .setCause(UnsafeByteOperations.unsafeWrap(d.msg.getBytes(StandardCharsets.UTF_8))) + .build() } private def serializeRemoteSinkCompleted( @@ -108,7 +114,7 @@ private[akka] final class StreamRefSerializer(val system: ExtendedActorSystem) val payloadBuilder = StreamRefMessages.Payload .newBuilder() - .setEnclosedMessage(ByteString.copyFrom(msgSerializer.toBinary(p))) + .setEnclosedMessage(UnsafeByteOperations.unsafeWrap(msgSerializer.toBinary(p))) .setSerializerId(msgSerializer.identifier) val ms = Serializers.manifestFor(msgSerializer, p)