Fixing ActorModelSpec for CallingThreadDispatcher
This commit is contained in:
parent
df27942402
commit
5d4ef80618
3 changed files with 8 additions and 19 deletions
|
|
@ -78,7 +78,7 @@ object ActorModelSpec {
|
||||||
case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff()
|
case Increment(count) ⇒ ack; count.incrementAndGet(); busy.switchOff()
|
||||||
case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff()
|
case CountDownNStop(l) ⇒ ack; l.countDown(); self.stop(); busy.switchOff()
|
||||||
case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested")
|
case Restart ⇒ ack; busy.switchOff(); throw new Exception("Restart requested")
|
||||||
case Interrupt ⇒ ack; busy.switchOff(); throw new InterruptedException("Ping!")
|
case Interrupt ⇒ ack; sender ! Status.Failure(new ActorInterruptedException(new InterruptedException("Ping!"))); busy.switchOff(); throw new InterruptedException("Ping!")
|
||||||
case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e
|
case ThrowException(e: Throwable) ⇒ ack; busy.switchOff(); throw e
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -379,26 +379,17 @@ abstract class ActorModelSpec extends AkkaSpec {
|
||||||
val a = newTestActor(dispatcher)
|
val a = newTestActor(dispatcher)
|
||||||
val f1 = a ? Reply("foo")
|
val f1 = a ? Reply("foo")
|
||||||
val f2 = a ? Reply("bar")
|
val f2 = a ? Reply("bar")
|
||||||
val f3 = try {
|
val f3 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ new KeptPromise(Left(ActorInterruptedException(ie))) }
|
||||||
a ? Interrupt
|
|
||||||
} catch {
|
|
||||||
// CallingThreadDispatcher throws IE directly
|
|
||||||
case ie: InterruptedException ⇒ new KeptPromise(Left(ActorInterruptedException(ie)))
|
|
||||||
}
|
|
||||||
val f4 = a ? Reply("foo2")
|
val f4 = a ? Reply("foo2")
|
||||||
val f5 = try {
|
val f5 = try { a ? Interrupt } catch { case ie: InterruptedException ⇒ new KeptPromise(Left(ActorInterruptedException(ie))) }
|
||||||
a ? Interrupt
|
|
||||||
} catch {
|
|
||||||
case ie: InterruptedException ⇒ new KeptPromise(Left(ActorInterruptedException(ie)))
|
|
||||||
}
|
|
||||||
val f6 = a ? Reply("bar2")
|
val f6 = a ? Reply("bar2")
|
||||||
|
|
||||||
assert(f1.get === "foo")
|
assert(f1.get === "foo")
|
||||||
assert(f2.get === "bar")
|
assert(f2.get === "bar")
|
||||||
assert(f4.get === "foo2")
|
assert(f4.get === "foo2")
|
||||||
assert(f3.value === None)
|
assert(intercept[ActorInterruptedException](f3.get).getMessage === "Ping!")
|
||||||
assert(f6.get === "bar2")
|
assert(f6.get === "bar2")
|
||||||
assert(f5.value === None)
|
assert(intercept[ActorInterruptedException](f5.get).getMessage === "Ping!")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -41,7 +41,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
|
||||||
private val remoteDaemonConnectionManager = new RemoteConnectionManager(app, remote)
|
private val remoteDaemonConnectionManager = new RemoteConnectionManager(app, remote)
|
||||||
|
|
||||||
private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime
|
private[akka] def theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = local.theOneWhoWalksTheBubblesOfSpaceTime
|
||||||
private[akka] def terminationFuture = new DefaultPromise[AkkaApplication.ExitStatus](Timeout.never)(app.dispatcher)
|
private[akka] def terminationFuture = local.terminationFuture
|
||||||
|
|
||||||
def defaultDispatcher = app.dispatcher
|
def defaultDispatcher = app.dispatcher
|
||||||
def defaultTimeout = app.AkkaConfig.ActorTimeout
|
def defaultTimeout = app.AkkaConfig.ActorTimeout
|
||||||
|
|
|
||||||
|
|
@ -125,9 +125,7 @@ class Agent[T](initialValue: T, app: AkkaApplication) {
|
||||||
if (Stm.activeTransaction) {
|
if (Stm.activeTransaction) {
|
||||||
val result = new DefaultPromise[T](timeout)(app.dispatcher)
|
val result = new DefaultPromise[T](timeout)(app.dispatcher)
|
||||||
get //Join xa
|
get //Join xa
|
||||||
deferred {
|
deferred { result completeWith dispatch } //Attach deferred-block to current transaction
|
||||||
result completeWith dispatch
|
|
||||||
} //Attach deferred-block to current transaction
|
|
||||||
result
|
result
|
||||||
} else dispatch
|
} else dispatch
|
||||||
}
|
}
|
||||||
|
|
@ -288,7 +286,7 @@ class AgentUpdater[T](agent: Agent[T]) extends Actor {
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case update: Update[_] ⇒ sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] })
|
case update: Update[_] ⇒ sender.tell(atomic(txFactory) { agent.ref alter update.function.asInstanceOf[T ⇒ T] })
|
||||||
case Get ⇒ sender ! agent.get
|
case Get ⇒ sender.tell(agent.get)
|
||||||
case _ ⇒
|
case _ ⇒
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue