diff --git a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala index c6eff59495..787793dc5f 100644 --- a/akka-core/src/main/scala/dataflow/DataFlowVariable.scala +++ b/akka-core/src/main/scala/dataflow/DataFlowVariable.scala @@ -85,13 +85,19 @@ object DataFlow { /** Sets the value of this variable (if unset) with the value of the supplied variable */ - def <<(ref: DataFlowVariable[T]): Unit = - if(this.value.get.isEmpty) in ! Set(ref()) + def <<(ref: DataFlowVariable[T]) { + if (this.value.get.isEmpty) in ! Set(ref()) + else throw new DataFlowVariableException( + "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") + } /** Sets the value of this variable (if unset) */ - def <<(value: T): Unit = - if(this.value.get.isEmpty) in ! Set(value) + def <<(value: T) { + if (this.value.get.isEmpty) in ! Set(value) + else throw new DataFlowVariableException( + "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") + } /** Retrieves the value of variable * throws a DataFlowVariableException if it times out @@ -99,9 +105,16 @@ object DataFlow { def apply(): T = { value.get getOrElse { val out = actorOf(new Out(this)).start - blockedReaders offer out - val result = (out !! Get).as[T] - out ! Exit + + val result = try { + blockedReaders offer out + (out !! Get).as[T] + } catch { + case e: Exception => + out ! Exit + throw e + } + result.getOrElse(throw new DataFlowVariableException("Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) } }