fixed bug in Agent.scala, fixed bug in RemoteClient.scala, fixed problem with tests

This commit is contained in:
Jonas Bonér 2010-04-09 11:16:38 +02:00
parent 1e789b3648
commit 6ed887fb7b
6 changed files with 80 additions and 93 deletions

View file

@ -90,7 +90,7 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* *
* IMPORTANT: * IMPORTANT:
* You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach', * You can *not* call 'agent.get', 'agent()' or use the monadic 'foreach',
* 'map and 'flatMap' within an enclosing transaction since that would block * 'map' and 'flatMap' within an enclosing transaction since that would block
* the transaction indefinitely. But all other operations are fine. The system * the transaction indefinitely. But all other operations are fine. The system
* will raise an error (e.g. *not* deadlock) if you try to do so, so as long as * will raise an error (e.g. *not* deadlock) if you try to do so, so as long as
* you test your application thoroughly you should be fine. * you test your application thoroughly you should be fine.
@ -99,11 +99,13 @@ class AgentException private[akka](message: String) extends RuntimeException(mes
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
sealed class Agent[T] private (initialValue: T) extends Transactor { sealed class Agent[T] private (initialValue: T) extends Transactor {
start
import Agent._ import Agent._
log.debug("Starting up Agent [%s]", _uuid)
private lazy val value = Ref[T]() private lazy val value = Ref[T]()
start this ! Value(initialValue)
this !! Value(initialValue)
/** /**
* Periodically handles incoming messages. * Periodically handles incoming messages.

View file

@ -168,7 +168,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = { private def donateMessage(receiver: Actor, thief: Actor): Option[MessageInvocation] = {
val donated = receiver._mailbox.pollLast val donated = receiver._mailbox.pollLast
if (donated != null) { if (donated != null) {
thief.forward(donated.message)(Some(donated.receiver)) thief ! donated.message
return Some(donated) return Some(donated)
} else return None } else return None
} }

View file

@ -200,7 +200,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
val channel = connection.awaitUninterruptibly.getChannel val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel) openChannels.add(channel)
if (!connection.isSuccess) { if (!connection.isSuccess) {
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(connection.getCause)) listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(connection.getCause))
log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port) log.error(connection.getCause, "Remote client connection to [%s:%s] has failed", hostname, port)
} }
isRunning = true isRunning = true
@ -232,7 +232,7 @@ class RemoteClient(val hostname: String, val port: Int) extends Logging {
} }
} else { } else {
val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.") val exception = new IllegalStateException("Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.")
listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception)) listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
throw exception throw exception
} }
@ -325,12 +325,12 @@ class RemoteClientHandler(val name: String,
futures.remove(reply.getId) futures.remove(reply.getId)
} else { } else {
val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result) val exception = new IllegalArgumentException("Unknown message received in remote client handler: " + result)
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(exception)) client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(exception))
throw exception throw exception
} }
} catch { } catch {
case e: Exception => case e: Exception =>
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(e)) client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(e))
log.error("Unexpected exception in remote client handler: %s", e) log.error("Unexpected exception in remote client handler: %s", e)
throw e throw e
} }
@ -345,7 +345,7 @@ class RemoteClientHandler(val name: String,
// Wait until the connection attempt succeeds or fails. // Wait until the connection attempt succeeds or fails.
client.connection.awaitUninterruptibly client.connection.awaitUninterruptibly
if (!client.connection.isSuccess) { if (!client.connection.isSuccess) {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(client.connection.getCause)) client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(client.connection.getCause))
log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress) log.error(client.connection.getCause, "Reconnection to [%s] has failed", remoteAddress)
} }
} }
@ -353,17 +353,17 @@ class RemoteClientHandler(val name: String,
} }
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientConnected(client.hostname, client.port)) client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientConnected(client.hostname, client.port))
log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress) log.debug("Remote client connected to [%s]", ctx.getChannel.getRemoteAddress)
} }
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientDisconnected(client.hostname, client.port)) client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientDisconnected(client.hostname, client.port))
log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress) log.debug("Remote client disconnected from [%s]", ctx.getChannel.getRemoteAddress)
} }
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.listeners.toArray.asInstanceOf[Array[Actor]].foreach(_ ! RemoteClientError(event.getCause)) client.listeners.toArray.foreach(l => l.asInstanceOf[Actor] ! RemoteClientError(event.getCause))
log.error(event.getCause, "Unexpected exception from downstream in remote client") log.error(event.getCause, "Unexpected exception from downstream in remote client")
event.getChannel.close event.getChannel.close
} }

View file

@ -1,6 +1,5 @@
package se.scalablesolutions.akka.actor package se.scalablesolutions.akka.actor
import _root_.java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.Actor.transactor import se.scalablesolutions.akka.actor.Actor.transactor
import se.scalablesolutions.akka.stm.Transaction.Global.atomic import se.scalablesolutions.akka.stm.Transaction.Global.atomic
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
@ -10,51 +9,40 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import org.junit.runner.RunWith import org.junit.runner.RunWith
import org.junit.{Test} import org.junit.Test
import java.util.concurrent.CountDownLatch import java.util.concurrent.{TimeUnit, CountDownLatch}
@RunWith(classOf[JUnitRunner]) @RunWith(classOf[JUnitRunner])
class AgentSpec extends junit.framework.TestCase class AgentSpec extends junit.framework.TestCase with Suite with MustMatchers {
with Suite with MustMatchers
with ActorTestUtil with Logging {
@Test def testSendFun = verify(new TestActor { @Test def testSendFun = {
def test = {
val agent = Agent(5) val agent = Agent(5)
handle(agent) {
agent send (_ + 1) agent send (_ + 1)
agent send (_ * 2) agent send (_ * 2)
val result = agent() val result = agent()
result must be(12) result must be(12)
agent.stop
} }
}
})
@Test def testSendValue = verify(new TestActor { @Test def testSendValue = {
def test = {
val agent = Agent(5) val agent = Agent(5)
handle(agent) {
agent send 6 agent send 6
val result = agent() val result = agent()
result must be(6) result must be(6)
agent.stop
} }
}
})
@Test def testSendProc = verify(new TestActor { @Test def testSendProc = {
def test = {
val agent = Agent(5) val agent = Agent(5)
var result = 0 var result = 0
val latch = new CountDownLatch(2) val latch = new CountDownLatch(2)
handle(agent) {
agent sendProc { e => result += e; latch.countDown } agent sendProc { e => result += e; latch.countDown }
agent sendProc { e => result += e; latch.countDown } agent sendProc { e => result += e; latch.countDown }
assert(latch.await(1, TimeUnit.SECONDS)) assert(latch.await(5, TimeUnit.SECONDS))
result must be(10) result must be(10)
agent.stop
} }
}
})
@Test def testOneAgentsendWithinEnlosingTransactionSuccess = { @Test def testOneAgentsendWithinEnlosingTransactionSuccess = {
case object Go case object Go
@ -64,7 +52,7 @@ with ActorTestUtil with Logging {
case Go => agent send { e => latch.countDown; e + 1 } case Go => agent send { e => latch.countDown; e + 1 }
} }
tx ! Go tx ! Go
assert(latch.await(1, TimeUnit.SECONDS)) assert(latch.await(5, TimeUnit.SECONDS))
val result = agent() val result = agent()
result must be(6) result must be(6)
agent.close agent.close
@ -84,14 +72,13 @@ with ActorTestUtil with Logging {
} }
} }
tx ! Go tx ! Go
assert(latch.await(1, TimeUnit.SECONDS)) assert(latch.await(5, TimeUnit.SECONDS))
agent.close agent.close
tx.stop tx.stop
assert(true) assert(true)
} }
@Test def testAgentForeach = verify(new TestActor { @Test def testAgentForeach = {
def test = {
val agent1 = Agent(3) val agent1 = Agent(3)
var result = 0 var result = 0
for (first <- agent1) { for (first <- agent1) {
@ -100,20 +87,16 @@ with ActorTestUtil with Logging {
result must be(4) result must be(4)
agent1.close agent1.close
} }
})
@Test def testAgentMap = verify(new TestActor { @Test def testAgentMap = {
def test = {
val agent1 = Agent(3) val agent1 = Agent(3)
val result = for (first <- agent1) yield first + 1 val result = for (first <- agent1) yield first + 1
result() must be(4) result() must be(4)
result.close result.close
agent1.close agent1.close
} }
})
@Test def testAgentFlatMap = verify(new TestActor { @Test def testAgentFlatMap = {
def test = {
val agent1 = Agent(3) val agent1 = Agent(3)
val agent2 = Agent(5) val agent2 = Agent(5)
val result = for { val result = for {
@ -125,5 +108,4 @@ with ActorTestUtil with Logging {
agent1.close agent1.close
agent2.close agent2.close
} }
})
} }

View file

@ -1,11 +1,14 @@
package se.scalablesolutions.akka.actor package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import java.util.concurrent.CountDownLatch
import org.scalatest.matchers.MustMatchers import org.scalatest.matchers.MustMatchers
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers import se.scalablesolutions.akka.dispatch.Dispatchers
import java.util.concurrent.{TimeUnit, CountDownLatch}
/** /**
* @author Jan Van Besien * @author Jan Van Besien
*/ */
@ -51,7 +54,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherSpec extends JUnitSuite with
slow ! i slow ! i
} }
finishedCounter.await finishedCounter.await(5, TimeUnit.SECONDS)
fast.invocationCount must be > (slow.invocationCount) fast.invocationCount must be > (slow.invocationCount)
} }
} }

View file

@ -59,7 +59,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
Path.fromFile(home) Path.fromFile(home)
} }
val encodingUtf8 = List("-encoding", "UTF-8") val encodingUtf8 = List("-encoding", "UTF-8")
override def parallelExecution = true override def parallelExecution = false
lazy val deployPath = akkaHome / "deploy" lazy val deployPath = akkaHome / "deploy"
lazy val distPath = akkaHome / "dist" lazy val distPath = akkaHome / "dist"