Sunday, July 31, 2011

Partially unexpected effects of chaining partial functions in Scala

People who learn Scala usually agree that pattern matching is a great feature which helps make your code more expressive. Some time later they also discover that partial functions used in the match statement can also be used separately. And since partial functions are full-blown functions, you can combine them in a couple useful ways:



f1 orElse f2

Combines two partial function in a new one; if f1 is not defined in its argument, tries f2.

f1 andThen f2

Applies the result of f1 to f2. This means that the output of f1 must be a type compatible with the input of f2



orElse


As others have discovered, you can combine a list (TraversableOnce really) of partial functions into one with reduce. What's not so obvious though is that the way you combine them can lead to unexpected perfomance consequences.


In order to easily create a lot of partial functions to test, we will create a higher-order function to generate them (if you're used to Java, you can call it a factory). The produced partial function will print a short message when its isDefinedAt method is called (not when it's applied):



def genPF(defined: Int): PF = { case i if {println(defined); defined == i} => i }
val f123 = 1 to 3 map genPF reduceLeft(_ orElse _)


Let's try it:



> f123(1)
1
1
1
> f123(2)
1
2
1
2
1
2
> f123(3)
1
2
3

Wait, what? The isDefinedAt method is called up to 6 times. It gets even worse with a bigger number of composed functions.



val f1to5 = 1 to 5 map genPF reduceLeft(_ orElse _)



> f1to5(2)
1
2
1
2
1
2
1
2
> f1to5(3)
1
2
3
1
2
3
1
2
3
> f1to5(4)
1
2
3
4
1
2
3
4

Let's take a closer look at the definition of isDefinedAt and apply of the function created with orElse:



def orElse[A1 <: A, B1 >: B](that: PartialFunction[A1, B1]) : PartialFunction[A1, B1] =
new PartialFunction[A1, B1] {
def isDefinedAt(x: A1): Boolean =
PartialFunction.this.isDefinedAt(x) || that.isDefinedAt(x)
def apply(x: A1): B1 =
if (PartialFunction.this.isDefinedAt(x)) PartialFunction.this.apply(x)
else that.apply(x)
}


When you apply a composed partial function, we first check if it's defined in either f1 or f2, and then we check f1 again, so that we know which one to call. This means that in the worst case, isDefinedAt for f1 is called twice.


Given this, we can explain what happens here. The isDefinedAt delegates to the composed functions' methods, and when it's called twice... you know what happens when we do this again and again. We can fairly easily find out that isDefinedAt is called k * (n - k + 1) times, where n: number of composed functions, k: the first function that matches.


Luckily, there is an easy solution to combine partial functions in a more efficient way. We can use reduceRight, where isDefinedAt for each composed function is invoked at most twice. Verifying this and finding out why is left as an exercise for the curious reader (as you undoubtedly are, since you're reading this).




andThen


You would think that f1 andThen f2 should be defined only in the cases when the results of f1 are defined in f2



val doubler: PartialFunction[Int,Int] = { case i if Set(1,2) contains i => i * 2 }
val f = doubler andThen doubler


Of course, that's not how it works. In order to find out if the output of f1 is a valid input for f2, we would need to execute the function, and it's better not to do this in case it has side effects. This means that we cannot rely on calling isDefined for the combined function to avoid MatchErrors:



> f isDefinedAt 2
res3: Boolean = true
> f(2)
scala.MatchError: 4
...

Conclusion: when you're looking for performance, it always help to understand how the abstractions you're using decompose into simpler building blocks.


Friday, June 17, 2011

Testing actors in Scala

Probably the most frequent question that people asked on Scala eXchange 2011 was how to test actors. Since I've long planned to write up a blog post on this topic, this was an indication that it's high time to get it done.



It seems that the main problem people have is that actors are asynchronous and this introduces non-determinism in their tests. How do you know when to check if the actor has received and processed the message? Do you just wait a certain number of seconds before checking? This would make tests unnecessarily slow.


Another problem I think folks have is that they don't know how to verify that an actor has sent a message to another actor. Developers are familiar with mocking objects to verify that a method has been called, but how do you mock an actor to verify it has received a certain message?


Finally, it is difficult for most to handle the fact that it's not easy (or at least not idiomatic) to check the actor's internal state. This is especially valid with Akka actors which are created using a factory method and you can't define methods for mucking with the actor's internals.


Let me first address the last one. How do you check that internal state of an actor has changed? You don't! I find that actors are better at following the Object-Oriented principle of encapsulation even than objects are. Relying on a certain internal state couples your tests unnecessarily to the implementation and makes them brittle. As Viktor Klang has pointed out, if the internal state of an actor cannot be observed outside the actor, does it really matter what it is?


So how do you know when an actor has processed a message which was sent asynchronously? An easy way to eliminate non-determinism is to define a method where the functionality is located and call that upon receiving a message:



object MyActor {

def computation(arg1: String, arg2: Int) = {

...
result
}
}

class MyActor extends Actor {

loop {
react {
case Message(arg1, arg2) =>

anotherActor ! computation(arg1, arg2)
}
}

}


Then you can test the method the way you are familiar with. Putting the method in the companion object means that you can't test the internal state- but this also means this approach should work with Akka actors as well. We also avoid the problem of checking if the next actor in the chain has received the result.


Sometimes you cannot test a helper method and sometimes testing the method is not enough. In these cases you want to verify explicitly that an actor has sent a message to another actor as a result of receiving a certain trigger message. Here's another approach we use at Apache ESME, which works very well:



case object Wait

class ConductorActor extends Actor {
def act {

react {
case Wait => reply {
receive {

case MessageReceived(msg, reason) => msg
}

}
}
}
}


What's going on here? We define a helper actor (the recipient) which is the one supposed to receive the message from the actor we want to test (the sender). Usually the sender we want to test doesn't send a message to a hardcoded recipient- it is a good idea to either inject it as a construction parameter at instantiation time or register it via a message representing a subscription request.


This actor uses a fairly rarely used nested syntax, which is only available with the actors in the standard Scala library. The recipient handler would reply synchronously (which is what we want) with the message received only after we give it the signal that we've already sent a message to the sender. This implementation also relies on the fact that unhandled messages are kept in the inbox if there's no handler for them. While this can lead to memory leaks if these messages don't get handled, it is a nice way to process out-of-order messages, which is something we take advantage of here. This is similar to selective receive in Erlang and is a fairly painless way to handle race conditions- it doesn't matter which message has been received first here.



These features are not present in Akka, but you could emulate nested handlers using become and keep unhandled messages using. An even better idea would be to use the built-in TestKit or the akka-expect project, which use the same technique in a not so ad-hoc manner (but AFAIK don't work for non-Akka actors).


So now the only thing we need to do is send the trigger message to the tested sender and then ask the recipient if the resulting message has been sent by the sender:



// wait till the message appears in the timeline
// or fail after 5 seconds
val msgReceived = conductor !? (5000L, Wait)

if (msgReceived.isEmpty) fail("no message received")


If the recipient gets the message within a certain timeframe, the test is successful, otherwise we time out and fail the test. The nice thing about this approach is that in the happy path case, the test can continue immediately without slowing down the test suite. The test is slowed down by the designated timeout only when the test is going to fail, but this should be an exceptional event.



A minor inconvenience is if the sender doesn't expect the recipient to be a Scala library actor, but e.g. a Lift actor, but this can be easily overcome by using a bridge actor, which only acts as an intermediary and just forwards the request to the designated recipient actor:



class BridgeActor(receiver: Actor) extends LiftActor {

protected def messageHandler = {
case nm @ MessageReceived(_, _) => receiver ! nm

}
}

val liftActor = new BridgeActor(conductor)

Distributor ! Listen(theUser.id.is, liftActor)
Distributor ! Listen(followerUser.id.is, liftActor)


Here we're injecting one actor as a construction parameter and registering another via the Listen message.



Further research


If you're using Akka, your best bet is the recommended TestKit- here's an article on how to use it:


http://roestenburg.agilesquad.com/2011/02/unit-testing-akka-actors-with-testkit_12.html


Another solution would be to use the akka-expect framework:


https://github.com/joda/akka-expect


A more universal library is Awaitility, which uses a similar solution, but with more general applicability to Java threads and Scala actors:



http://code.google.com/p/awaitility/


ScalaTest and Specs also have the conductor actor, which implement a similar idea of using a CountDownLatch to make actors deterministic:


http://www.scalatest.org/scaladoc/doc-1.0/org/scalatest/concurrent/Conductor.html