GZIPInputStream is not closed at all at places, where it's instantiated, which leads to off-heap memory leak with every deserialized message.

GZIPInputStream uses Inflater internally (so also native zlib). Inflater frees up memory only on explicit call to end() or during finalization (finalize() contains only call to end()), so GZIPInputStream should always be explicitly closed.

As native libraries are used a non-scalaish try-finally is used to avoid off-heap memory leak for GZIPInputStream and GZIPOutputStream in case of exceptions.
This commit is contained in:
Kirill Plyashkevich 2016-02-15 16:15:05 +01:00
parent 10d3af1478
commit 35ffc49226
5 changed files with 20 additions and 15 deletions

View file

@ -42,8 +42,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS
def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
val zip = new GZIPOutputStream(bos)
msg.writeTo(zip)
zip.close()
try msg.writeTo(zip)
finally zip.close()
bos.toByteArray
}
@ -59,7 +59,8 @@ class MessageSerializer(val system: ExtendedActorSystem) extends SerializerWithS
readChunk()
}
readChunk()
try readChunk()
finally in.close()
out.toByteArray
}

View file

@ -274,8 +274,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
private def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
val zip = new GZIPOutputStream(bos)
msg.writeTo(zip)
zip.close()
try msg.writeTo(zip)
finally zip.close()
bos.toByteArray
}
@ -291,7 +291,8 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
readChunk()
}
readChunk()
try readChunk()
finally in.close()
out.toByteArray
}

View file

@ -81,8 +81,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor
private def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
val zip = new GZIPOutputStream(bos)
msg.writeTo(zip)
zip.close()
try msg.writeTo(zip)
finally zip.close()
bos.toByteArray
}
@ -98,7 +98,8 @@ private[akka] class DistributedPubSubMessageSerializer(val system: ExtendedActor
readChunk()
}
readChunk()
try readChunk()
finally in.close()
out.toByteArray
}

View file

@ -81,8 +81,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
val zip = new GZIPOutputStream(bos)
msg.writeTo(zip)
zip.close()
try msg.writeTo(zip)
finally zip.close()
bos.toByteArray
}
@ -98,7 +98,8 @@ class ClusterMessageSerializer(val system: ExtendedActorSystem) extends BaseSeri
readChunk()
}
readChunk()
try readChunk()
finally in.close()
out.toByteArray
}

View file

@ -56,8 +56,8 @@ trait SerializationSupport {
def compress(msg: MessageLite): Array[Byte] = {
val bos = new ByteArrayOutputStream(BufferSize)
val zip = new GZIPOutputStream(bos)
msg.writeTo(zip)
zip.close()
try msg.writeTo(zip)
finally zip.close()
bos.toByteArray
}
@ -73,7 +73,8 @@ trait SerializationSupport {
readChunk()
}
readChunk()
try readChunk()
finally in.close()
out.toByteArray
}