понедельник, 12 апреля 2010 г.

MailboxProcessors: Erlang-style concurrency in F#

Erlang is the language designed by Ericsson with primary application domain: distributed, fault-tolerant applications. The basic concurrency-related unit in Erlang - lightweight process. These processes has nothing similar with processes of OS, but rather green threads. Processes comminicate by asynchronous copy-nothing message-passing: every process has associated mailbox with messages yet consumed and messages can be retrieved from mailbox without preserving original order of receiveing. Creation and destruction of process is very fast and number of coexisting processes in running system can be large (up to 20 million). The essential feature is giving one process ability to monitor state of another process so processes can form a hierarchy of workers and supervisors.

Concurrency style applied in Erlang is very natural for human pattern of thought (it is well described in Chapter 7 "Concurrency" of "Programming Erlang : Software for a concurrent world" by Joe Armstrong) so it is not surprising that libraries emulating such style exists in many languages: Scala Actors, Retlang (C#). Axum (MS Research incubation project) is based on the actor model. F# is not exception, its standard library includes MailboxProcessor - the hero of todays post.

MailboxProcessor is combination of message queue and message handler build atop of Asyncs infrastructure. It is usually created with MailboxProcessor.Start method though you can invoke constructor directly . Static method Start is just a shortcut that subsequently calls the constructor and instance Start method. Instance Start method starts execution of handler using thread from ThreadPool. Parts (//1) and (//2) in the sample below are equvalent.

let body = fun (_ : MailboxProcessor<int>) -> async {return ()}

// 1
MailboxProcessor.Start(body)

// 2
let mbp = new MailboxProcessor<_>(body)
mbp.Start()

Further samples will be written with (1) to make the declaration shorter. Both versions of Start method accept optional parameter cancellationToken to cancel execution of handler body.

Mandatory parameter body is the main workhorse of message handler. This function accepts MailboxProcessor<_> ( in runtime this will be reference to instance of MailboxProcessor associated with this handler).

mailboxprocessor

Blue bordered reference is the point for sending messages in mailbox. Green bordered one is used in handler to receive messages (though it is not prohibited to send message to itself)

MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "Sending..."
inbox.Post(10)
let! r = inbox.Receive()
printfn "Received %d" r
}
)
(*
Sending...
Received 10
*)

We can split members of MailboxProcessor according to common usage scenarios.

Methods that send message to MailboxProcessor

  • Post - asynchronously posts message in MailboxProcessor message queue.
  • Synchronous send
    • PostAndReply - synchronously sends message to MailboxProcessor and awaits for reply. Argument buildMessage should return the message with attached replyChannel. Receiver (MailboxProcessor) sends reply through provided reply channel. PostAndReply optionally accept timeout parameter. If reply wasn't received after timeout expiration then TimeoutException is raised
    • TryPostAndReply - similar to PostAndReply but returns None if method doesn't return in given timeout
  • Asynchronous send
    • PostAndAsyncReply - sends message to MailboxProcessor and returns computation that await the reply. Timeout behavior is the same with PostAndReply
    • PostAndTryAsyncReply - similar to PostAndAsyncReply but returns None if methods doesn't return in given timeout
let print fmt = Printf.ksprintf (fun s -> printfn "%s: %s" (System.DateTime.Now.ToString("hh::mm:ss")) s) fmt 
type Message = Async of int | Sync of int * AsyncReplyChannel<int>

let run1 () = MailboxProcessor.Start(fun (inbox : MailboxProcessor<Message>) ->
async {
while true do
let! msg = inbox.Receive()
match msg with
| Async(v) ->
print "async::%d" v
| Sync(v, reply) ->
print "sync::%d" v
do! Async.Sleep(v)
reply.Reply (v + 1)
}
)
let run2 (other : MailboxProcessor<Message>) = MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
let! timeout = inbox.Receive()
print "sending with timeout %d" timeout
let! value = other.PostAndAsyncReply(fun reply -> Sync(timeout, reply))
print "received %d" value
print "sending with timeout 1"
let! r = other.PostAndTryAsyncReply((fun reply -> Sync(timeout, reply)), timeout = 1)
match r with
| Some(_) -> print "some"
| None -> print "none"
}
)
let a = run1 ()
let b = run2 a

a.Post(Async 1000)
let r = a.PostAndReply(fun reply -> Sync(3000, reply))
print "received %d" r
b.Post(1000)
(*
// message is send directly to mailbox
05::10:58: async::1000 // async post
05::10:58: sync::3000 // sync post
05::11:01: received 3001 // result received after 3 seconds
// message is send from the handler of another mailbox
05::11:01: sending with timeout 1000 // sync send
05::11:01: sync::1000
05::11:02: received 1001 // success
05::11:02: sending with timeout 1 // send with little timeout
05::11:02: sync::1000
05::11:02: none // timeout expired
*)

Remarks:

  1. We define two message types: Async that contains only value and Sync - with value and reply channel
  2. Asynchronous Post returns immediately (as expected)
  3. PostAndReply was suspended for 3 seconds until handler replies to given channel.
  4. PostAndAsyncReply and PostAndTryAsyncReply can be used from async workflows, i.e. from the handler of another mailbox.
  5. PostAndTryAsyncReply on timeout returns None without throwing exception (as expected)

Receive methods.

  • Receive and TryReceive - both methods return computation that receives first message in message queue. The difference lies in the way of processing timeout: Receive throws TimeoutException and TryReceive returns None. By default timeout value is Timeout.Infinite but it can be changed with DefaultTimeout property.

I've already shown some samples with successful completion of Receive, now it is time to demonstrate the failures:

// 1
MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "1"
let! r = inbox.TryReceive(timeout = 1)
match r with
| Some _ -> printfn "some"
| None -> printfn "none"
printfn "2"
return()
}
)
|> ignore
(*
1
none
2
*)

// 2
MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "1"
try
do! inbox.Receive(timeout = 1) |> Async.Ignore
with
e -> printfn "%O" e
printfn "2"
return()
}
)
|> ignore
(*
1
>
System.TimeoutException: Mailbox.Receive timed out.
at <StartupCode$FSharp-Core>.$Control.processFirstArrival@1984-5.Invoke(Boolean _arg1024)
at Microsoft.FSharp.Control.AsyncBuilderImpl.args@720.Invoke(a a)
2
*)

// 3
let mbp = new MailboxProcessor<_>(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "1"
do! inbox.Receive(timeout = 1) |> Async.Ignore
printfn "2"
return()
}
)
mbp.Error.Add(printfn "%O")
mbp.Start()

(*
1
>
System.TimeoutException: Mailbox.Receive timed out.
at <StartupCode$FSharp-Core>.$Control.processFirstArrival@1984-5.Invoke(Boolean _arg1024)
at Microsoft.FSharp.Control.AsyncBuilderImpl.args@720.Invoke(a a)
*)


Note: if exception is raised inside the handler you can either capture it explicity (// 2) or subscribe to Error event (// 3). In latter case handler body is terminated and only after that event is raised so you won't have chance to recover.

Message queue state

  • CurrentQueueLength - returns number of unprocessed messages in message queue. Note that in concurrent environment you should not strongly rely on this value because it can be quickly changed.
let mbp = MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
while true do
do! Async.Sleep 3000
printfn "number of messages: %d" inbox.CurrentQueueLength
}
)
(*

val mbp : MailboxProcessor<int>

> number of messages: 0
number of messages: 0
mbp.Post(1);;
val it : unit = ()
> number of messages: 1
number of messages: 1
mbp.Post(2);;
val it : unit = ()
> number of messages: 2
mbp.Post(3);;
val it : unit = ()
> number of messages: 3
number of messages: 3
number of messages: 3
*)

Selective receive

In small systems you can control the order of sending/receiving messages, but large systems makes this very difficult. When huge amount of concurrent paticipants are involved in process then messages can come into mailbox into unexpected order. In this case possibility to selectively receive message become essential. I'll demonstrate it on the small sample. Imaging you create supervisor process that spawns a number of children and waits messages from them in a strongly specified order, so we can illustrate state transitions with the following diagram

states

But what happens if process B will send completion signal before process A. We cannot ignore it because in this case we will wait in the third state forever. Gladly Erlang offers a brilliant feature named selective receive. Source code for supervisor transitions is below:

-module(selective_receive).
-export([waitA/0, waitB/0]).

waitA() ->
receive
{a, N} ->
io:format("received A ~w~n", [N]),
waitB()
end.
waitB() ->
receive
{b, N} ->
io:format("received B ~w~n", [N])
end.

sr

F# supports selective receive through two methods


  • Scan and TryScan - Scan/TryScan accepts scanner function and returns computation that applies scanner to all elements in message queue and returns on the first Some result. Matched element is removed from the queue. If after timeout expiration no element is matched, Scan throws TimeoutException and TryScan returns Async with None value.

    type Message = A of int | B of int

    let p = MailboxProcessor.Start(fun (inbox : MailboxProcessor<_>) ->
    let rec waitA = async {
    let! msg = inbox.Scan(function A(v) -> Some (async.Return v) | _ -> None)
    printfn "Received A %d" msg
    return! waitB
    }
    and waitB = async {
    let! msg = inbox.Scan(function B(v) -> Some(async.Return v) | _ -> None)
    printfn "Received B %d" msg
    }
    waitA
    )

    printfn "Posting B"
    p.Post(B 100)
    printfn "Posting A"
    p.Post(A 100)
    (*
    Posting B
    Posting A
    Received A 100
    Received B 100
    *)

You possibly have already noticed that scanner returns Async<'T> not 'T, so we need to wrap v into Async. Thought this approach is generic (it is possible to refer another async workflow inside returned result) it is not very handy in a large number of scenarios when you need just to select message. I'll be glad to see another overload of Scan with simplified scanner in the standard library.

type Message = A of int | B of int
type Microsoft.FSharp.Control.MailboxProcessor<'T> with
member this.Scan(scanner : 'T -> 'R option, ?timeout) =
this.Scan(
(fun v -> match scanner v with None -> None | Some(v) -> Some (async.Return v)),
?timeout = timeout
)


let p = MailboxProcessor.Start(fun (inbox : MailboxProcessor<_>) ->
let rec waitA = async {
let! msg = inbox.Scan(function A(v) -> Some (v) | _ -> None)
printfn "Received A %d" msg
return! waitB
}
and waitB = async {
let! msg = inbox.Scan(function B(v) -> Some(v) | _ -> None)
printfn "Received B %d" msg
}
waitA
)

printfn "Posting B"
p.Post(B 100)
printfn "Posting A"
p.Post(A 100)
(*
Posting B
Posting A
Received A 100
Received B 100
*)

What I would like to change/add/...

  1. I think that method names can be more descriptive. I don't mean simple ones (Post and Receive) but rather creepy things like PostAndTryAsyncReply or TryPostAndReply or PostAndAsyncReply. I am always confused with mix of all these Post, Try, Async, and Reply components so I propose to rename PostAndReply to Send and change names of remaining methods accordingly. Combination of Send and Post is often occured in different .NET API and meaning of each method is clearly defined: Send is synchronous version and Post - asynchronous.


    NowProposed
    Post Post
    PostAndReplySend
    TryPostAndReplyTrySend
    PostAndAsyncReply SendAsync
    PostAndTryAsyncReply TrySendAsync


  2. Process hierarchies is a very interesting way for handling faults in a large systems, you can program worker code to handle the correct scenario and rely on supervisor in case of errors. Scala has framework Akka that utilized the same principles of fault tolerance. However it is non-trivial for now to apply this approach to F# mailbox processors. The simpliest case: for example one process spawns another and subscribes its on Error event. If child process occasionaly dies parent receives the notification with exception...in the ex-child thread. The question arises: how to propagate exception into parent thread. The simplies solution is store exception in private field and reraise on the first call to MailboxProcessor (i.e Receive). However handler utilizes MailboxProcessor directly and members of MailboxProcessor are not virtual so I need to create wrapper, redefine handler in the terms of trapper and delegate all calls to actual MessageProcessor checking the exception field beforehand. Such solution is possible but it will be better if MailboxProcessor can initially support structuring them in hierarchies with unidirectional and bidirectional links.

  3. Separate interface of MailboxProcessor to sender and receiver parts so receiving can only be made from inside the handler

3 комментария:

  1. Thank you very much for this post.

    As I undertstand MailboxProcessor brings only Erlang-style message passing capabilities to F#. But what about light-weight processes? How to handle up to 20 millions of coexisting processes in F# so effectively as Erlang do?

    ОтветитьУдалить
  2. I don't think that MailboxProcessors can be as effective as Erlang processes on such number of paticipants. Erlang VM ws initially designed for these scenarios, its processes are not mapped to native threads or processes of OS, they are rather similar to green threads scheduled by VM. In the opposite MailboxProcessors internally use CLR ThreadPool and CLR threads (in current implementation) map one to one on OS threads with all consequences: additional memory consumption, overhead on context switches etc.

    ОтветитьУдалить
  3. Thanks again and that is exactly what I meant. Just interested in implementing Erlang-style green processes in .NET, but still haven't found any ready to use solutions.

    ОтветитьУдалить

 
GeekySpeaky: Submit Your Site!