From 35ffc492263a6f758e7f05054ba8264625ae5dae Mon Sep 17 00:00:00 2001 From: Kirill Plyashkevich Date: Mon, 15 Feb 2016 16:15:05 +0100 Subject: [PATCH] GZIPInputStream is not closed at all at places, where it's instantiated, which leads to off-heap memory leak with every deserialized message. GZIPInputStream uses Inflater internally (so also native zlib). Inflater frees up memory only on explicit call to end() or during finalization (finalize() contains only call to end()), so GZIPInputStream should always be explicitly closed. As native libraries are used a non-scalaish try-finally is used to avoid off-heap memory leak for GZIPInputStream and GZIPOutputStream in case of exceptions. --- .../akka/cluster/metrics/protobuf/MessageSerializer.scala | 7 ++++--- .../protobuf/ClusterShardingMessageSerializer.scala | 7 ++++--- .../protobuf/DistributedPubSubMessageSerializer.scala | 7 ++++--- .../akka/cluster/protobuf/ClusterMessageSerializer.scala | 7 ++++--- .../akka/cluster/ddata/protobuf/SerializationSupport.scala | 7 ++++--- 5 files changed, 20 insertions(+), 15 deletions(-) diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index 2cf7c091f1..24c976ad53 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -42,8 +42,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -59,7 +59,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index edb6bd19cf..93168c1db9 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -274,8 +274,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -291,7 +291,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala index cdb24a2647..3375fd7c4d 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala @@ -81,8 +81,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor private def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -98,7 +98,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index a8d141c6a9..a78ce073cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -81,8 +81,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -98,7 +98,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala index 578775d5c7..40b0837d6a 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala @@ -56,8 +56,8 @@ trait SerializationSupport { def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -73,7 +73,8 @@ trait SerializationSupport { readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray }