Fail fast on null messages #23796
This commit is contained in:
parent
bcce7bd8c1
commit
be1e25f4d1
10 changed files with 74 additions and 11 deletions
|
|
@ -6,8 +6,9 @@ package akka.typed
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
import akka.actor.DeadLetterSuppression
|
import akka.actor.{ DeadLetterSuppression, InvalidMessageException }
|
||||||
import akka.typed.scaladsl.Actor
|
import akka.typed.scaladsl.Actor
|
||||||
|
|
||||||
import scala.language.existentials
|
import scala.language.existentials
|
||||||
|
|
||||||
object ActorContextSpec {
|
object ActorContextSpec {
|
||||||
|
|
@ -620,6 +621,13 @@ class ActorContextSpec extends TypedSpec(ConfigFactory.parseString(
|
||||||
adapter.path.name should include("named")
|
adapter.path.name should include("named")
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
def `42 must not allow null messages`(): Unit = sync(setup("ctx42") { (ctx, startWith) ⇒
|
||||||
|
startWith.keep { subj ⇒
|
||||||
|
intercept[InvalidMessageException] {
|
||||||
|
subj ! null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Normal extends Tests {
|
trait Normal extends Tests {
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.typed
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
import akka.Done
|
import akka.Done
|
||||||
|
import akka.actor.InvalidMessageException
|
||||||
import akka.typed.scaladsl.Actor
|
import akka.typed.scaladsl.Actor
|
||||||
import akka.typed.scaladsl.Actor._
|
import akka.typed.scaladsl.Actor._
|
||||||
import akka.typed.testkit.Inbox
|
import akka.typed.testkit.Inbox
|
||||||
|
|
@ -97,6 +98,14 @@ class ActorSystemSpec extends Spec with Matchers with BeforeAndAfterAll with Sca
|
||||||
f.futureValue should ===(42)
|
f.futureValue should ===(42)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def `must not allow null messages`(): Unit = {
|
||||||
|
withSystem("null-messages", Actor.empty[String]) { sys ⇒
|
||||||
|
intercept[InvalidMessageException] {
|
||||||
|
sys ! null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object `An ActorSystemImpl` extends CommonTests {
|
object `An ActorSystemImpl` extends CommonTests {
|
||||||
|
|
|
||||||
|
|
@ -4,7 +4,9 @@
|
||||||
package akka.typed
|
package akka.typed
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
import scala.concurrent.{ Promise, Future }
|
import akka.actor.InvalidMessageException
|
||||||
|
|
||||||
|
import scala.concurrent.{ Future, Promise }
|
||||||
|
|
||||||
class FunctionRefSpec extends TypedSpecSetup {
|
class FunctionRefSpec extends TypedSpecSetup {
|
||||||
|
|
||||||
|
|
@ -159,6 +161,16 @@ class FunctionRefSpec extends TypedSpecSetup {
|
||||||
client1.hasSomething should ===(false)
|
client1.hasSomething should ===(false)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def `must not allow null messages`(): Unit = {
|
||||||
|
val p = Promise[ActorRef[String]]
|
||||||
|
val ref = ActorRef(p.future)
|
||||||
|
|
||||||
|
intercept[InvalidMessageException] {
|
||||||
|
ref ! null
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,7 +6,7 @@ package akka.typed.scaladsl.adapter
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
import scala.util.control.NoStackTrace
|
import scala.util.control.NoStackTrace
|
||||||
import akka.typed.ActorRef
|
import akka.typed.ActorRef
|
||||||
import akka.actor.Props
|
import akka.actor.{ InvalidMessageException, Props }
|
||||||
import akka.typed.Behavior
|
import akka.typed.Behavior
|
||||||
import akka.typed.Terminated
|
import akka.typed.Terminated
|
||||||
import akka.typed.scaladsl.Actor
|
import akka.typed.scaladsl.Actor
|
||||||
|
|
@ -169,6 +169,15 @@ class AdapterSpec extends AkkaSpec {
|
||||||
probe.expectMsg("ok")
|
probe.expectMsg("ok")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"not send null message from typed to untyped" in {
|
||||||
|
val probe = TestProbe()
|
||||||
|
val untypedRef = system.actorOf(untyped1)
|
||||||
|
val typedRef = system.spawnAnonymous(typed1(untypedRef, probe.ref))
|
||||||
|
intercept[InvalidMessageException] {
|
||||||
|
typedRef ! null
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"send message from untyped to typed" in {
|
"send message from untyped to typed" in {
|
||||||
val probe = TestProbe()
|
val probe = TestProbe()
|
||||||
val typedRef = system.spawnAnonymous(typed2)
|
val typedRef = system.spawnAnonymous(typed2)
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,8 @@
|
||||||
*/
|
*/
|
||||||
package akka.typed
|
package akka.typed
|
||||||
|
|
||||||
|
import akka.actor.InvalidMessageException
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import akka.util.LineNumbers
|
import akka.util.LineNumbers
|
||||||
import akka.annotation.{ DoNotInherit, InternalApi }
|
import akka.annotation.{ DoNotInherit, InternalApi }
|
||||||
|
|
@ -275,6 +277,7 @@ object Behavior {
|
||||||
|
|
||||||
private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] =
|
private def interpret[T](behavior: Behavior[T], ctx: ActorContext[T], msg: Any): Behavior[T] =
|
||||||
behavior match {
|
behavior match {
|
||||||
|
case null ⇒ throw new InvalidMessageException("[null] is not an allowed message")
|
||||||
case SameBehavior | UnhandledBehavior ⇒
|
case SameBehavior | UnhandledBehavior ⇒
|
||||||
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
|
throw new IllegalArgumentException(s"cannot execute with [$behavior] as behavior")
|
||||||
case _: UntypedBehavior[_] ⇒
|
case _: UntypedBehavior[_] ⇒
|
||||||
|
|
|
||||||
|
|
@ -4,15 +4,17 @@
|
||||||
package akka.typed
|
package akka.typed
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
import akka.actor.InvalidActorNameException
|
import akka.actor.{ Cancellable, InvalidActorNameException, InvalidMessageException }
|
||||||
import akka.util.Helpers
|
import akka.util.Helpers
|
||||||
|
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
import akka.dispatch.ExecutionContexts
|
import akka.dispatch.ExecutionContexts
|
||||||
|
|
||||||
import scala.concurrent.ExecutionContextExecutor
|
import scala.concurrent.ExecutionContextExecutor
|
||||||
import akka.actor.Cancellable
|
|
||||||
import akka.util.Unsafe.{ instance ⇒ unsafe }
|
import akka.util.Unsafe.{ instance ⇒ unsafe }
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
import java.util.Queue
|
import java.util.Queue
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import scala.util.control.Exception.Catcher
|
import scala.util.control.Exception.Catcher
|
||||||
|
|
@ -198,7 +200,8 @@ private[typed] class ActorCell[T](
|
||||||
publish(Error(e, self.path.toString, getClass, "swallowing exception during message send"))
|
publish(Error(e, self.path.toString, getClass, "swallowing exception during message send"))
|
||||||
}
|
}
|
||||||
|
|
||||||
def send(msg: T): Unit =
|
def send(msg: T): Unit = {
|
||||||
|
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
|
||||||
try {
|
try {
|
||||||
val old = unsafe.getAndAddInt(this, statusOffset, 1)
|
val old = unsafe.getAndAddInt(this, statusOffset, 1)
|
||||||
val oldActivations = activations(old)
|
val oldActivations = activations(old)
|
||||||
|
|
@ -224,6 +227,7 @@ private[typed] class ActorCell[T](
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch handleException
|
} catch handleException
|
||||||
|
}
|
||||||
|
|
||||||
def sendSystem(signal: SystemMessage): Unit = {
|
def sendSystem(signal: SystemMessage): Unit = {
|
||||||
@tailrec def needToActivate(): Boolean = {
|
@tailrec def needToActivate(): Boolean = {
|
||||||
|
|
|
||||||
|
|
@ -7,11 +7,15 @@ package internal
|
||||||
import akka.{ actor ⇒ a }
|
import akka.{ actor ⇒ a }
|
||||||
import akka.dispatch.sysmsg._
|
import akka.dispatch.sysmsg._
|
||||||
import akka.util.Unsafe.{ instance ⇒ unsafe }
|
import akka.util.Unsafe.{ instance ⇒ unsafe }
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.util.control.NonFatal
|
import scala.util.control.NonFatal
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import java.util.ArrayList
|
import java.util.ArrayList
|
||||||
import scala.util.{ Success, Failure }
|
|
||||||
|
import akka.actor.InvalidMessageException
|
||||||
|
|
||||||
|
import scala.util.{ Failure, Success }
|
||||||
import scala.annotation.unchecked.uncheckedVariance
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -85,12 +89,14 @@ private[typed] final class FunctionRef[-T](
|
||||||
_terminate: FunctionRef[T] ⇒ Unit)
|
_terminate: FunctionRef[T] ⇒ Unit)
|
||||||
extends WatchableRef[T](_path) {
|
extends WatchableRef[T](_path) {
|
||||||
|
|
||||||
override def tell(msg: T): Unit =
|
override def tell(msg: T): Unit = {
|
||||||
|
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
|
||||||
if (isAlive)
|
if (isAlive)
|
||||||
try send(msg, this) catch {
|
try send(msg, this) catch {
|
||||||
case NonFatal(ex) ⇒ // nothing we can do here
|
case NonFatal(ex) ⇒ // nothing we can do here
|
||||||
}
|
}
|
||||||
else () // we don’t have deadLetters available
|
else () // we don’t have deadLetters available
|
||||||
|
}
|
||||||
|
|
||||||
override def sendSystem(signal: SystemMessage): Unit = signal match {
|
override def sendSystem(signal: SystemMessage): Unit = signal match {
|
||||||
case Create() ⇒ // nothing to do
|
case Create() ⇒ // nothing to do
|
||||||
|
|
@ -193,7 +199,8 @@ private[typed] class FutureRef[-T](_path: a.ActorPath, bufferSize: Int, f: Futur
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
override def tell(msg: T): Unit =
|
override def tell(msg: T): Unit = {
|
||||||
|
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
|
||||||
_target match {
|
_target match {
|
||||||
case Left(list) ⇒
|
case Left(list) ⇒
|
||||||
list.synchronized {
|
list.synchronized {
|
||||||
|
|
@ -202,6 +209,7 @@ private[typed] class FutureRef[-T](_path: a.ActorPath, bufferSize: Int, f: Futur
|
||||||
}
|
}
|
||||||
case Right(ref) ⇒ ref ! msg
|
case Right(ref) ⇒ ref ! msg
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
override def sendSystem(signal: SystemMessage): Unit = signal match {
|
override def sendSystem(signal: SystemMessage): Unit = signal match {
|
||||||
case Create() ⇒ // nothing to do
|
case Create() ⇒ // nothing to do
|
||||||
|
|
|
||||||
|
|
@ -4,11 +4,13 @@
|
||||||
package akka.typed
|
package akka.typed
|
||||||
package internal
|
package internal
|
||||||
|
|
||||||
|
import akka.actor.InvalidMessageException
|
||||||
import akka.util.LineNumbers
|
import akka.util.LineNumbers
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.typed.{ ActorContext ⇒ AC }
|
import akka.typed.{ ActorContext ⇒ AC }
|
||||||
import akka.typed.scaladsl.{ ActorContext ⇒ SAC }
|
import akka.typed.scaladsl.{ ActorContext ⇒ SAC }
|
||||||
import akka.typed.scaladsl.Actor
|
import akka.typed.scaladsl.Actor
|
||||||
|
|
||||||
import scala.reflect.ClassTag
|
import scala.reflect.ClassTag
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.typed
|
||||||
package internal
|
package internal
|
||||||
package adapter
|
package adapter
|
||||||
|
|
||||||
|
import akka.actor.InvalidMessageException
|
||||||
import akka.{ actor ⇒ a }
|
import akka.{ actor ⇒ a }
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.dispatch.sysmsg
|
import akka.dispatch.sysmsg
|
||||||
|
|
@ -16,7 +17,10 @@ import akka.dispatch.sysmsg
|
||||||
extends ActorRef[T] with internal.ActorRefImpl[T] {
|
extends ActorRef[T] with internal.ActorRefImpl[T] {
|
||||||
|
|
||||||
override def path: a.ActorPath = untyped.path
|
override def path: a.ActorPath = untyped.path
|
||||||
override def tell(msg: T): Unit = untyped ! msg
|
override def tell(msg: T): Unit = {
|
||||||
|
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
|
||||||
|
untyped ! msg
|
||||||
|
}
|
||||||
override def isLocal: Boolean = untyped.isLocal
|
override def isLocal: Boolean = untyped.isLocal
|
||||||
override def sendSystem(signal: internal.SystemMessage): Unit =
|
override def sendSystem(signal: internal.SystemMessage): Unit =
|
||||||
ActorRefAdapter.sendSystemMessage(untyped, signal)
|
ActorRefAdapter.sendSystemMessage(untyped, signal)
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.typed
|
||||||
package internal
|
package internal
|
||||||
package adapter
|
package adapter
|
||||||
|
|
||||||
|
import akka.actor.InvalidMessageException
|
||||||
import akka.{ actor ⇒ a }
|
import akka.{ actor ⇒ a }
|
||||||
|
|
||||||
import scala.concurrent.ExecutionContextExecutor
|
import scala.concurrent.ExecutionContextExecutor
|
||||||
|
|
@ -26,7 +27,10 @@ import akka.annotation.InternalApi
|
||||||
import ActorRefAdapter.sendSystemMessage
|
import ActorRefAdapter.sendSystemMessage
|
||||||
|
|
||||||
// Members declared in akka.typed.ActorRef
|
// Members declared in akka.typed.ActorRef
|
||||||
override def tell(msg: T): Unit = untyped.guardian ! msg
|
override def tell(msg: T): Unit = {
|
||||||
|
if (msg == null) throw new InvalidMessageException("[null] is not an allowed message")
|
||||||
|
untyped.guardian ! msg
|
||||||
|
}
|
||||||
override def isLocal: Boolean = true
|
override def isLocal: Boolean = true
|
||||||
override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped.guardian, signal)
|
override def sendSystem(signal: internal.SystemMessage): Unit = sendSystemMessage(untyped.guardian, signal)
|
||||||
final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", untyped.name)) / "user"
|
final override val path: a.ActorPath = a.RootActorPath(a.Address("akka", untyped.name)) / "user"
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue