From d0bf45fe68cf01c22259c7acf8768ef4ef3121e7 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Thu, 28 Jul 2016 08:23:47 -0400 Subject: [PATCH] =str 20711 IllegalMonitorStateException in Source.queue --- .../src/main/scala/akka/stream/stage/GraphStage.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala index 493335fa9c..a66f8fe2d5 100644 --- a/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala +++ b/akka-stream/src/main/scala/akka/stream/stage/GraphStage.scala @@ -1309,14 +1309,12 @@ private[akka] trait CallbackWrapper[T] extends AsyncCallback[T] { callbackState.get() match { case Initialized(cb) ⇒ cb(arg) case list @ NotInitialized(l) ⇒ callbackState.compareAndSet(list, NotInitialized(arg :: l)) - case Stopped(cb) ⇒ - lock.unlock() - cb(arg) + case Stopped(cb) ⇒ cb(arg) } } private[this] def locked(body: ⇒ Unit): Unit = { lock.lock() - try body finally if (lock.isLocked) lock.unlock() + try body finally lock.unlock() } }