diff --git a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala index 2cf7c091f1..24c976ad53 100644 --- a/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala +++ b/akka-cluster-metrics/src/main/scala/akka/cluster/metrics/protobuf/MessageSerializer.scala @@ -42,8 +42,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -59,7 +59,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index edb6bd19cf..93168c1db9 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -274,8 +274,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -291,7 +291,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala index cdb24a2647..3375fd7c4d 100644 --- a/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala +++ b/akka-cluster-tools/src/main/scala/akka/cluster/pubsub/protobuf/DistributedPubSubMessageSerializer.scala @@ -81,8 +81,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor private def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -98,7 +98,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala index a8d141c6a9..a78ce073cf 100644 --- a/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/protobuf/ClusterMessageSerializer.scala @@ -81,8 +81,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -98,7 +98,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray } diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala index 578775d5c7..40b0837d6a 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/protobuf/SerializationSupport.scala @@ -56,8 +56,8 @@ trait SerializationSupport { def compress(msg: MessageLite): Array[Byte] = { val bos = new ByteArrayOutputStream(BufferSize) val zip = new GZIPOutputStream(bos) - msg.writeTo(zip) - zip.close() + try msg.writeTo(zip) + finally zip.close() bos.toByteArray } @@ -73,7 +73,8 @@ trait SerializationSupport { readChunk() } - readChunk() + try readChunk() + finally in.close() out.toByteArray }