The problem: Simple constructor injection in a multi-threaded environment may cause memory consistency issues.
When using simple constructor injection in conjunction with concurrency care must be taken because the classes constructed at startup in one thread may not be set up completely when read from components running in other threads. In applications which use multiple threads this is a very common scenario, since the components are usually wired together at startup and used later from disparate components which may run in their own threads Spring and Guice take care of this for us. But if we use straight Scala, we are responsible for handling this problem.
Consider the following example: We are building a messaging system. It has three components which are initialized at startup. It has a component called UserMailbox to store messages, a User component, and a Mailer component to send mail such as “You have mail!”. They are all constructed in main, followed by an immediate call to UserMailbox. The crucial piece here to understand is that one of the components, UserMailbox depends on the other two, and that there is an immediate call to it after construction, as shown in the code below. As long as there are no additional threads this would work:
class User(userId: String) {}
class Mailer(){
def sendYouHaveMail(user: User) = ???
}
class UserMailbox(user: User, mailer: Mailer){
def processNewMessages() = {
mailer.sendYouHaveMail(user)
}
}
class MyApp{
object MyApp extends App{
val user = new User("john@smith.com")
val mailer = new Mailer()
val mailboxes = new UserMailbox(user, mailer)
mailboxes.processNewMessages()
}
}
This produces the expected output:
Sending mail to User(john@smith.com)
But as soon as we introduce additional threads, we run into trouble. Let’s modify the code in a way that the user mail box runs as an Actor:
import akka.actor.{Actor, ActorSystem, Props}
import akka.stream.ActorMaterializer
import akka.pattern.ask
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
case class ProcessNewMessages()
case class ProcessedNewMessages()
class UserMailboxActor(user: User, mailer: Mailer) extends Actor{
override def receive: Receive = {
case p:ProcessNewMessages =>
mailer.sendYouHaveMail(user)
println(s"Processing message in thread ${Thread.currentThread().getId}, user=$user")
sender() ! ProcessedNewMessages()
case _ =>
}
}
object MyApp2 extends App{
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
implicit val timeout = Timeout(1 second)
val user = new User("john@smith.com")
println(s"Initialized user in thread ${Thread.currentThread().getId}, user=$user")
val mailer = new Mailer()
val mailboxActor = system.actorOf(Props(classOf[UserMailboxActor], user, mailer))
val result = Await.result(mailboxActor ? ProcessNewMessages(), 10 second)
println(s"result=$result")
}
This will produce the following output:
Initialized user in thread 1, user=User(john@smith.com)
Sending mail to User(john@smith.com)
Processing message in thread 15, user=User(john@smith.com)
result=ProcessedNewMessages()
What has just happened here? Scala’s actors are its separate threads done the Scala way. So the actor code runs in its own thread. When we print out the thread id in main, it is different from what the thread id is in the output produced from the actor, as this can be seen in our new output below the code sample. User and Mailer were initialized in one thread and used from another thread without any concurrency management mechanism. This is a big “no no” in the JVM. We must never do this because references to objects initialized in other threads may no be fully initialized. This is called a memory consistency problem in JVM concurrency management.
The solution:
Well we could synchronize somehow but that is one of the most expensive forms of concurrency management methods as it creates mutex locks to form memory barriers. This means that threads would block as long as the lock is set. Is there anything slicker than that?
We do know that one of the ways Scala patterns avoid having to synchronize is that vals initialized in a constructor are considered to be safe to use from other threads by the time the constructor finishes initializing the object. So we can fix this problem by moving the initialization into a separate class, AppModule.
class AppModule{
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
private val user = new User("john@smith.com")
println(s"Initialized user in thread ${Thread.currentThread().getId}, user=$user")
private val mailer = new Mailer()
val mailboxActor = system.actorOf(Props(classOf[UserMailboxActor], user, mailer))
}
object MyApp3 extends App{
implicit val timeout = Timeout(1 second)
val module = new AppModule
val result = Await.result(module.mailboxActor ? ProcessNewMessages(), 10 second)
println(s"result=$result")
}
Let’s see how Scala can give this guarantee in terms of the JVMs concurrency rules. When we decompile the AppModule.class generated by the above code we get the following output that shows that our vals get turned into final data members. See below:
public final class AppModule$ {
public static final com.kornelcsaszar.articles.diAndConcurrency.AppModule$ MODULE$;
private final akka.actor.ActorSystem system;
private final akka.stream.ActorMaterializer materializer;
private final com.kornelcsaszar.articles.diAndConcurrency.User user;
private final com.kornelcsaszar.articles.diAndConcurrency.Mailer mailer;
private final akka.actor.ActorRef mailboxActor;
public akka.actor.ActorSystem system() { /* compiled code */ }
public akka.stream.ActorMaterializer materializer() { /* compiled code */ }
private com.kornelcsaszar.articles.diAndConcurrency.User user() { /* compiled code */ }
private com.kornelcsaszar.articles.diAndConcurrency.Mailer mailer() { /* compiled code */ }
public akka.actor.ActorRef mailboxActor() { /* compiled code */ }
private AppModule$() { /* compiled code */ }
}
This is what we were looking for. The references in question are all turned into finals in a constructor which the JVM guarantees to be thread safe. As the documentation from Oracle describes, finals are an important useful exception to the rule that we must synchronize: https://docs.oracle.com/javase/tutorial/essential/concurrency/syncmeth.html
Now the processing of the ProcessNewMessages message is safe because by the time the call is made, the AppModule reference will point to a fully initialized object that is consistently visible from all threads.
Conclusion
When using constructor injection and the components run in different threads, we must make sure that we satisfy the JVM’s concurrency management requirements, as the components are initialized and used in different threads. Scala’s guarantee that val-s assigned during construction can be safely accessed after the object is created, provides a convenient and efficient way to make our code thread safe.