четверг, 29 апреля 2010 г.

Nature of value restriction

Introduction

Value restriction is an error that firstly makes a direct hit in the brain of non-prepared developer. It has no straight analogues in the majority of mainstream languages and thus very confusing. Fortunately error message produced by compiler contains comprehensive solution. However IMO good developer cannot use the tool without having a knowledge how it works under the hood. Otherwise programming becomes some sort of magic… This is not the way of Jedi.

Let’s start from the very beginning. In addition to other useful and interesting features F# supports parametric polymorphism. This means that we can define data types or functions that handle all arguments in the uniform way without depending on its actual type. For example:

// declare type parameters explicitly
let len<'T> (l : 'T list) = List.length l // 'T list -> int
// F# type inference mechanism infers most generic type
let len2 l = List.length l // 'T list -> int

let l1 = len [1;2;3]
let l2 = len ["1";"2";"3"]

let l3 = len2 [1;2;3]
let l4 = len2 ["1";"2";"3"]

Len and Len2 are polymorphic functions; they can operate on arbitrary lists.

In addition to polymorphic functions and data types F# allows to declare polymorphic values, the most well-known example is empty list.

let nil = [] // 'a list
let a = 1::nil // int list
let b = "1"::nil // string list

In the sample above nil acts as a value of int list and string list. Another piece of code, at that time with option:

let none = None // 'a option
let intRes = defaultArg none 1 // int
let stringRes = defaultArg none "1" // string

Everything is cool and safe, but note – we’ve used only immutable values. The following sample is hypothetical, it doesn’t compile (due to value restriction) but nicely demonstrate the potential problems when mutability and side effects are allowed.

let v = ref None

Imagine that this piece of code is valid and v has type ref option<’T>. Then next few harmless statements will cause crash of application in runtime.

do v := Some 1
let r = defaultArg !v "A" // kaboom!!1 (we take int value from reference cell and try to treat it as a string)

Type checker thinks that these statements are perfectly correct because v is polymorphic. Luckily F# doesn’t let this happen and it is achieved by the value restriction.

Value restriction

The idea behind the solution is pretty simple; we need to divide all let bindings into two groups: first – that can be polymorphic and second – that should be bound to one particular type. The principle of splitting is based on the syntactic definition of values, automatic generalization is performed only on syntactic functions and on simple immutable values. All other bindings are not classified as values and so cannot be automatically generalized, basically compiler will suffer from lack of sufficient type information in a non-generic construct. Some general samples:

// applying polymorphic function to polymorphic value
let v = id []
// partial application
let map_id = Seq.map id

// create object with constructor inferring of type arguments
type A<'T>() = class end
let v1 = A<_>()

// create object omitting type arguments
type A<'T>() = class end
let create<'T> = A<'T>()
let a = create

Note: these samples cover all cases from the MSDN library. Also there is an error in MSDN article: code from case 1 compiles and executes without any problems.

Taming value restriction

As we now know what causes value restriction then we have a number of solutions on the pockets.

  1. Compiler doesn’t have all necessary information regarding types of the expression – we can give him a hint by annotating types explicitly

    let v : int list = id []
    let map_id : string list -> string seq = Seq.map id

    type A<'T>() = class end
    let v1 = A<int>()

    let create<'T> = A<'T>()
    let a = create<string>

    Simple solution but non-efficient in general cases - we lose polymorphism

  2. It is possible to convert non-value to value (syntactic function) by performing eta-conversion

    // added fake unit argument
    let v () = id []
    // added l (list) argument
    let map_id l = Seq.map id l
  3. Sometimes value restriction can be removed by reordering code elements so missing type information can be obtained from latter usage.

    // comments annotates operations that helps type inference engine

    let v = id []
    let intList = 1::v // cons operation

    let map_id = Seq.map id
    let res = map_id [1;2;3] // particular type of list

    type A<'T>() =
    let mutable v = Unchecked.defaultof<'T>
    member x.Value
    with set(value) = v <- value

    let a = A<_>()
    a.Value <- 10 // type of property (and of entire type) is inferred from assigned value

    Note: This approach is often cannot be used in FSI sessions due to specific of FSI (type for input expressions should be inferred immediately).

  4. F# standard library offers very interesting possibility to run generalization process by applying GeneralizableValue attribute to so-called type functions. Type function is let binding with generic arguments and empty argument list. F# spec defines designation of type functions:

    Type functions are typically used for:

    • Pure functions that compute type-specific information based on the supplied type arguments
    • Pure functions whose result is independent of inferred type arguments, e.g. empty sets and maps

    Type function can be annotated either with RequireExplicitTypeArgumentsAttribute (generic arguments should be set explicitly) or GeneralizableValueAttribute (generic arguments will be inferred).

    type A<'T>() = class end
    [<GeneralizableValue>]
    let create<'T> = A<'T>()
    let r = create
    printfn "%s" (r.GetType().FullName)

    Combination of type function and GeneralizableValueAttribute can be very confusing. Imagine:

    [<GeneralizableValue>]
    let none<'T> : 'T option ref = ref None

    The fact that none can be used without type parameters makes it appear like simple value. Don’t let this view fool you – this is function that returns new ref to None on every call.

    printfn "%O" (Option.isNone !none) // true
    none := Some(1)
    printfn "%O" (Option.isNone !none) // true

воскресенье, 18 апреля 2010 г.

MailboxProcessors: Practical Application

Last time we have examined the API of MailboxProcessors. Now it is time to see, how this concept can be applied to solve real-world problems.

Just as a quick reminder - "ping-pong" sample: two mailboxes, first one sends Ping request to second one, second replies with Pong. After specified number of iteratons execution stops.

type Message = Ping of AsyncReplyChannel<unit> | Stop

let pong (inbox : MailboxProcessor<Message>)=
let rec loop = async {
let! msg = inbox.Receive()
match msg with
| Ping(reply) ->
reply.Reply()
return! loop
| Stop -> ()
}
loop
let ping = MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
let rec start () = async {
printfn "ping::waiting for number of iterations..."
let! n = inbox.Receive()
printfn "ping::%d received, start sending ping" n
do! loop n (MailboxProcessor.Start(pong))
}
and loop n (pong : MailboxProcessor<Message>) = async {
if n > 0 then
printfn "ping::sending ping, %d remaining" n
do! pong.PostAndAsyncReply(Ping)
printfn "ping::pong received"
do! loop (n - 1) pong
else
printfn "ping::exiting"
pong.Post(Stop)
}
start ()
)
ping.Post(5)
(*
ping::waiting for number of iterations...
ping::5 received, start sending ping
ping::sending ping, 5 remaining
ping::pong received
ping::sending ping, 4 remaining
ping::pong received
ping::sending ping, 3 remaining
ping::pong received
ping::sending ping, 2 remaining
ping::pong received
ping::sending ping, 1 remaining
ping::pong received
ping::exiting
*)

As opposed to trivial Pong function, Ping has a noteworthy moment, it defines a simple state machine with two states. First - obtaining for number of iterations, second - sending ping and waiting for reply.

Ping-pong sample fits nice for demo purposes but suffers from grave shortcoming - it is purely synthetic and useless in real life. It would be very nice to see the sample where MailboxProcessors can uncover their power and capabilities...

Web Sockets

The Web sockets specification - developed as part of the HTML5 initiative - introduced the Web Socket JavaScript interface, which defines a full-duplex single socket connection over which messages can be sent between client and server. The Web Socket standard attempts to simplify much of the complexity around bi-directional web communication and connection management.

The idea behind Web Sockets is very simple - client initiates TCP connection and performs handshake with server. After successful completion of handshake web sockets works pretty similar to usual sockets. Data is sent in the form of UTF-8 text. Frame can contain data where high bit is not set (0x00 to 0x7F) - this kind id data is transferred as a stream of bytes started with 0x00 and terminated with 0xFF. If high bit in data is set then this data should have leading length indicator which is encoded as a series of 7-bit bytes stored in octets with the 8th bit being set for all but the last byte.

For demonstration we will create simple web socket server that accepts requests from browser (Google Chrome already supports web sockets so it will be out test animal) and periodically sent JSON-encoded messages with current time information.

Server components.

Server
  • Request dispatcher accepts incoming connections and spawns worker to handle client requests
  • Worker sends and receives messages from particular client. Worker also incapluates all transport/serialization details so external components just post messages to worker.
  • Controller maintains list of workers and broadcasts messages.
  • Timer periodically send messages to Controller.
open System
open System.IO
open System.Net
open System.Net.Sockets
open System.Text
open System.Threading
open System.Runtime.Serialization

[<DataContract>]
type Time =
{ [<DataMember(Name = "hour")>] mutable Hour : int
[<DataMember(Name = "minute")>] mutable Minute : int
[<DataMember(Name = "second")>] mutable Second : int }
static member New(dt : DateTime) = {Hour = dt.Hour; Minute = dt.Minute; Second = dt.Second}

type Msg =
| Connect of MailboxProcessor<Time>
| Disconnect of MailboxProcessor<Time>
| Tick of Time

let startMailboxProcessor ct f = MailboxProcessor.Start(f, cancellationToken = ct)

let timer (ctrl : MailboxProcessor<Msg>) interval = async {
while true do
do! Async.Sleep interval
ctrl.Post(Tick <| Time.New(DateTime.Now))
}

let runController (ct : CancellationToken) = startMailboxProcessor ct (fun (inbox : MailboxProcessor<Msg>) ->
let listeners = new ResizeArray<_>()
async {
while not ct.IsCancellationRequested do
let! msg = inbox.Receive()
match msg with
| Connect l -> listeners.Add(l)
| Disconnect l -> listeners.Remove(l) |> ignore
| Tick msg -> listeners.ForEach(fun l -> l.Post msg)
}
)

let runWorker (tcp : TcpClient) (ctrl : MailboxProcessor<Msg>) ct = ignore <| startMailboxProcessor ct (fun (inbox : MailboxProcessor<Time>) ->
let rec handshake = async {
let ns = tcp.GetStream()
let reader = new StreamReader(ns)
let lines =
reader.ReadLine()
|> Seq.unfold(fun l -> if String.IsNullOrEmpty l then None else Some(l, reader.ReadLine()))
|> Seq.toArray

match lines with
| [| "GET /timer HTTP/1.1"; "Upgrade: WebSocket"; "Connection: Upgrade"; _; _|] ->
// TODO : parse WebSocket-Origin and WebSocket-Location

// send server handshake part
let serverHandshakePart =
"HTTP/1.1 101 Web Socket Protocol Handshake
Upgrade: WebSocket
Connection: Upgrade
WebSocket-Origin: file://
WebSocket-Location: ws://localhost/timer

"B
do! ns.AsyncWrite(serverHandshakePart)
return! run ns
| _ ->
//validation failed - close connection
tcp.Close()
}
and run (ns : NetworkStream) = async {
let json = System.Runtime.Serialization.Json.DataContractJsonSerializer(typeof<Time>)
ctrl.Post(Connect inbox)
try
while not ct.IsCancellationRequested do
let! time = inbox.Receive()

let ms = new MemoryStream()
json.WriteObject(ms, time)

do ns.WriteByte(byte 0x00)
do! ns.AsyncWrite(ms.ToArray())
do ns.WriteByte(byte 0xFF)
finally
ns.Close()
ctrl.Post(Disconnect inbox)
}
handshake
)

let runRequestDispatcher () =
let listener = new TcpListener(IPAddress.Loopback, 80)
let cts = new CancellationTokenSource()
let token = cts.Token

let controller = runController token
Async.Start (timer controller 1000, token)

let main = async {
try
listener.Start(10)
while not cts.IsCancellationRequested do
let! client = Async.FromBeginEnd(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient)
runWorker client controller token
finally
listener.Stop()
}

Async.Start(main, token)

{ new IDisposable with member x.Dispose() = cts.Cancel()}

let dispose = runRequestDispatcher ()
printfn "press any key to stop..."
Console.ReadKey() |> ignore
dispose.Dispose()

index.html

<html>
<head>
<script>
ws = new WebSocket("ws://localhost/timer");
ws.onopen = function() { alert("Opened"); };
ws.onclose = function() { alert("Closed"); };
ws.onmessage = function(evt) {
var date = eval('(' + evt.data + ')');
hour.innerText = date.hour;
minute.innerText = date.minute;
second.innerText = date.second;
};
</script>
<style type="text/css">
table, td, th
{
font-family:"Trebuchet MS", Arial, Helvetica, sans-serif;
border:1px solid #A7C942;
padding:3px 7px 2px 7px;
text-align:center;
}
td
{
font-weight:bold;
font-size:20px;
}
th
{
font-size:14px;
font-style:italic;
background-color:#A7C942;
color:white;
}
</style>
</head>
<body>
<table>
<tr>
<th>Hour</th>
<th>Minute</th>
<th>Second</th>
</tr>
<tr>
<td id="hour"></td>
<td id="minute"></td>
<td id="second"></td>
</tr>
</table>
</body>
</html>

I've opened 3 Chrome windows thus creating 3 workers. As we can notice that timer values in all browsers are changed simultaneously

p1

p2

 p3

понедельник, 12 апреля 2010 г.

MailboxProcessors: Erlang-style concurrency in F#

Erlang is the language designed by Ericsson with primary application domain: distributed, fault-tolerant applications. The basic concurrency-related unit in Erlang - lightweight process. These processes has nothing similar with processes of OS, but rather green threads. Processes comminicate by asynchronous copy-nothing message-passing: every process has associated mailbox with messages yet consumed and messages can be retrieved from mailbox without preserving original order of receiveing. Creation and destruction of process is very fast and number of coexisting processes in running system can be large (up to 20 million). The essential feature is giving one process ability to monitor state of another process so processes can form a hierarchy of workers and supervisors.

Concurrency style applied in Erlang is very natural for human pattern of thought (it is well described in Chapter 7 "Concurrency" of "Programming Erlang : Software for a concurrent world" by Joe Armstrong) so it is not surprising that libraries emulating such style exists in many languages: Scala Actors, Retlang (C#). Axum (MS Research incubation project) is based on the actor model. F# is not exception, its standard library includes MailboxProcessor - the hero of todays post.

MailboxProcessor is combination of message queue and message handler build atop of Asyncs infrastructure. It is usually created with MailboxProcessor.Start method though you can invoke constructor directly . Static method Start is just a shortcut that subsequently calls the constructor and instance Start method. Instance Start method starts execution of handler using thread from ThreadPool. Parts (//1) and (//2) in the sample below are equvalent.

let body = fun (_ : MailboxProcessor<int>) -> async {return ()}

// 1
MailboxProcessor.Start(body)

// 2
let mbp = new MailboxProcessor<_>(body)
mbp.Start()

Further samples will be written with (1) to make the declaration shorter. Both versions of Start method accept optional parameter cancellationToken to cancel execution of handler body.

Mandatory parameter body is the main workhorse of message handler. This function accepts MailboxProcessor<_> ( in runtime this will be reference to instance of MailboxProcessor associated with this handler).

mailboxprocessor

Blue bordered reference is the point for sending messages in mailbox. Green bordered one is used in handler to receive messages (though it is not prohibited to send message to itself)

MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "Sending..."
inbox.Post(10)
let! r = inbox.Receive()
printfn "Received %d" r
}
)
(*
Sending...
Received 10
*)

We can split members of MailboxProcessor according to common usage scenarios.

Methods that send message to MailboxProcessor

  • Post - asynchronously posts message in MailboxProcessor message queue.
  • Synchronous send
    • PostAndReply - synchronously sends message to MailboxProcessor and awaits for reply. Argument buildMessage should return the message with attached replyChannel. Receiver (MailboxProcessor) sends reply through provided reply channel. PostAndReply optionally accept timeout parameter. If reply wasn't received after timeout expiration then TimeoutException is raised
    • TryPostAndReply - similar to PostAndReply but returns None if method doesn't return in given timeout
  • Asynchronous send
    • PostAndAsyncReply - sends message to MailboxProcessor and returns computation that await the reply. Timeout behavior is the same with PostAndReply
    • PostAndTryAsyncReply - similar to PostAndAsyncReply but returns None if methods doesn't return in given timeout
let print fmt = Printf.ksprintf (fun s -> printfn "%s: %s" (System.DateTime.Now.ToString("hh::mm:ss")) s) fmt 
type Message = Async of int | Sync of int * AsyncReplyChannel<int>

let run1 () = MailboxProcessor.Start(fun (inbox : MailboxProcessor<Message>) ->
async {
while true do
let! msg = inbox.Receive()
match msg with
| Async(v) ->
print "async::%d" v
| Sync(v, reply) ->
print "sync::%d" v
do! Async.Sleep(v)
reply.Reply (v + 1)
}
)
let run2 (other : MailboxProcessor<Message>) = MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
let! timeout = inbox.Receive()
print "sending with timeout %d" timeout
let! value = other.PostAndAsyncReply(fun reply -> Sync(timeout, reply))
print "received %d" value
print "sending with timeout 1"
let! r = other.PostAndTryAsyncReply((fun reply -> Sync(timeout, reply)), timeout = 1)
match r with
| Some(_) -> print "some"
| None -> print "none"
}
)
let a = run1 ()
let b = run2 a

a.Post(Async 1000)
let r = a.PostAndReply(fun reply -> Sync(3000, reply))
print "received %d" r
b.Post(1000)
(*
// message is send directly to mailbox
05::10:58: async::1000 // async post
05::10:58: sync::3000 // sync post
05::11:01: received 3001 // result received after 3 seconds
// message is send from the handler of another mailbox
05::11:01: sending with timeout 1000 // sync send
05::11:01: sync::1000
05::11:02: received 1001 // success
05::11:02: sending with timeout 1 // send with little timeout
05::11:02: sync::1000
05::11:02: none // timeout expired
*)

Remarks:

  1. We define two message types: Async that contains only value and Sync - with value and reply channel
  2. Asynchronous Post returns immediately (as expected)
  3. PostAndReply was suspended for 3 seconds until handler replies to given channel.
  4. PostAndAsyncReply and PostAndTryAsyncReply can be used from async workflows, i.e. from the handler of another mailbox.
  5. PostAndTryAsyncReply on timeout returns None without throwing exception (as expected)

Receive methods.

  • Receive and TryReceive - both methods return computation that receives first message in message queue. The difference lies in the way of processing timeout: Receive throws TimeoutException and TryReceive returns None. By default timeout value is Timeout.Infinite but it can be changed with DefaultTimeout property.

I've already shown some samples with successful completion of Receive, now it is time to demonstrate the failures:

// 1
MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "1"
let! r = inbox.TryReceive(timeout = 1)
match r with
| Some _ -> printfn "some"
| None -> printfn "none"
printfn "2"
return()
}
)
|> ignore
(*
1
none
2
*)

// 2
MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "1"
try
do! inbox.Receive(timeout = 1) |> Async.Ignore
with
e -> printfn "%O" e
printfn "2"
return()
}
)
|> ignore
(*
1
>
System.TimeoutException: Mailbox.Receive timed out.
at <StartupCode$FSharp-Core>.$Control.processFirstArrival@1984-5.Invoke(Boolean _arg1024)
at Microsoft.FSharp.Control.AsyncBuilderImpl.args@720.Invoke(a a)
2
*)

// 3
let mbp = new MailboxProcessor<_>(fun (inbox : MailboxProcessor<int>) ->
async {
printfn "1"
do! inbox.Receive(timeout = 1) |> Async.Ignore
printfn "2"
return()
}
)
mbp.Error.Add(printfn "%O")
mbp.Start()

(*
1
>
System.TimeoutException: Mailbox.Receive timed out.
at <StartupCode$FSharp-Core>.$Control.processFirstArrival@1984-5.Invoke(Boolean _arg1024)
at Microsoft.FSharp.Control.AsyncBuilderImpl.args@720.Invoke(a a)
*)


Note: if exception is raised inside the handler you can either capture it explicity (// 2) or subscribe to Error event (// 3). In latter case handler body is terminated and only after that event is raised so you won't have chance to recover.

Message queue state

  • CurrentQueueLength - returns number of unprocessed messages in message queue. Note that in concurrent environment you should not strongly rely on this value because it can be quickly changed.
let mbp = MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
async {
while true do
do! Async.Sleep 3000
printfn "number of messages: %d" inbox.CurrentQueueLength
}
)
(*

val mbp : MailboxProcessor<int>

> number of messages: 0
number of messages: 0
mbp.Post(1);;
val it : unit = ()
> number of messages: 1
number of messages: 1
mbp.Post(2);;
val it : unit = ()
> number of messages: 2
mbp.Post(3);;
val it : unit = ()
> number of messages: 3
number of messages: 3
number of messages: 3
*)

Selective receive

In small systems you can control the order of sending/receiving messages, but large systems makes this very difficult. When huge amount of concurrent paticipants are involved in process then messages can come into mailbox into unexpected order. In this case possibility to selectively receive message become essential. I'll demonstrate it on the small sample. Imaging you create supervisor process that spawns a number of children and waits messages from them in a strongly specified order, so we can illustrate state transitions with the following diagram

states

But what happens if process B will send completion signal before process A. We cannot ignore it because in this case we will wait in the third state forever. Gladly Erlang offers a brilliant feature named selective receive. Source code for supervisor transitions is below:

-module(selective_receive).
-export([waitA/0, waitB/0]).

waitA() ->
receive
{a, N} ->
io:format("received A ~w~n", [N]),
waitB()
end.
waitB() ->
receive
{b, N} ->
io:format("received B ~w~n", [N])
end.

sr

F# supports selective receive through two methods


  • Scan and TryScan - Scan/TryScan accepts scanner function and returns computation that applies scanner to all elements in message queue and returns on the first Some result. Matched element is removed from the queue. If after timeout expiration no element is matched, Scan throws TimeoutException and TryScan returns Async with None value.

    type Message = A of int | B of int

    let p = MailboxProcessor.Start(fun (inbox : MailboxProcessor<_>) ->
    let rec waitA = async {
    let! msg = inbox.Scan(function A(v) -> Some (async.Return v) | _ -> None)
    printfn "Received A %d" msg
    return! waitB
    }
    and waitB = async {
    let! msg = inbox.Scan(function B(v) -> Some(async.Return v) | _ -> None)
    printfn "Received B %d" msg
    }
    waitA
    )

    printfn "Posting B"
    p.Post(B 100)
    printfn "Posting A"
    p.Post(A 100)
    (*
    Posting B
    Posting A
    Received A 100
    Received B 100
    *)

You possibly have already noticed that scanner returns Async<'T> not 'T, so we need to wrap v into Async. Thought this approach is generic (it is possible to refer another async workflow inside returned result) it is not very handy in a large number of scenarios when you need just to select message. I'll be glad to see another overload of Scan with simplified scanner in the standard library.

type Message = A of int | B of int
type Microsoft.FSharp.Control.MailboxProcessor<'T> with
member this.Scan(scanner : 'T -> 'R option, ?timeout) =
this.Scan(
(fun v -> match scanner v with None -> None | Some(v) -> Some (async.Return v)),
?timeout = timeout
)


let p = MailboxProcessor.Start(fun (inbox : MailboxProcessor<_>) ->
let rec waitA = async {
let! msg = inbox.Scan(function A(v) -> Some (v) | _ -> None)
printfn "Received A %d" msg
return! waitB
}
and waitB = async {
let! msg = inbox.Scan(function B(v) -> Some(v) | _ -> None)
printfn "Received B %d" msg
}
waitA
)

printfn "Posting B"
p.Post(B 100)
printfn "Posting A"
p.Post(A 100)
(*
Posting B
Posting A
Received A 100
Received B 100
*)

What I would like to change/add/...

  1. I think that method names can be more descriptive. I don't mean simple ones (Post and Receive) but rather creepy things like PostAndTryAsyncReply or TryPostAndReply or PostAndAsyncReply. I am always confused with mix of all these Post, Try, Async, and Reply components so I propose to rename PostAndReply to Send and change names of remaining methods accordingly. Combination of Send and Post is often occured in different .NET API and meaning of each method is clearly defined: Send is synchronous version and Post - asynchronous.


    NowProposed
    Post Post
    PostAndReplySend
    TryPostAndReplyTrySend
    PostAndAsyncReply SendAsync
    PostAndTryAsyncReply TrySendAsync


  2. Process hierarchies is a very interesting way for handling faults in a large systems, you can program worker code to handle the correct scenario and rely on supervisor in case of errors. Scala has framework Akka that utilized the same principles of fault tolerance. However it is non-trivial for now to apply this approach to F# mailbox processors. The simpliest case: for example one process spawns another and subscribes its on Error event. If child process occasionaly dies parent receives the notification with exception...in the ex-child thread. The question arises: how to propagate exception into parent thread. The simplies solution is store exception in private field and reraise on the first call to MailboxProcessor (i.e Receive). However handler utilizes MailboxProcessor directly and members of MailboxProcessor are not virtual so I need to create wrapper, redefine handler in the terms of trapper and delegate all calls to actual MessageProcessor checking the exception field beforehand. Such solution is possible but it will be better if MailboxProcessor can initially support structuring them in hierarchies with unidirectional and bidirectional links.

  3. Separate interface of MailboxProcessor to sender and receiver parts so receiving can only be made from inside the handler

пятница, 2 апреля 2010 г.

Evoking by "Patterns of Parallel Programming" 1.1

In previous post I've made the sketch of Async.Any method, however code in sample should not be treated as a real-world implementation but rather as a demonstration of approach. Now we will improve existing version so the interest to it can be switched from academical to practical. Many thanks to Dmitry Lomov, as he reveals number of very serious flaws.

Let's list all imperfections of current solution:

  1. User continuation was invoked under the lock as well as (possibly) registered cancellation handlers
  2. CancellationTokenSource is not disposed after finishing the computation
  3. Exceptions are not handled at all
  4. All computations are executed in new fresh thread instead of enqueueing job to ThreadPool
  5. Base computation can already have ambient cancellation token and for now this token is ignored.

And now without further ado I'm glad to present you new version:

open System.Threading

type Async with
static member Any(asyncs : seq<Async<'T>>) =
let arr = Seq.toArray asyncs

// signals cancel only once, subsequent calls are ignored
// returns true if cancel signal issued; otherwise - false
let cancel cancelled (cts : CancellationTokenSource) =
if Interlocked.CompareExchange(cancelled, 1, 0) = 0
then cts.Cancel(); true
else false

let createComputation cancelled (cts : CancellationTokenSource) = Async.FromContinuations(fun (cont, econt, ccont) ->

let remaining = ref (int64 arr.Length)
if !remaining = 0L then failwith "Sequence should contain at least one computation"

let cancelled = ref 0

// decrements number of active computations
// disposes cancellation token source when last computation ends
// returns true if Dispose is called; otherwise - false
let decrement () =
if Interlocked.Decrement(remaining) = 0L
then cts.Dispose(); true
else false

let exceptions = new ResizeArray<_>()

let saveSuccess v =
if cancel cancelled cts
then
decrement() |> ignore
cont v
else
decrement() |> ignore

// stores exception
// calls error econt if all subcomputations were completed with errors
let saveError ex =
lock exceptions (fun () -> exceptions.Add(ex))
if decrement() && (exceptions.Count = arr.Length) then
econt(new System.AggregateException(exceptions))

for c in arr do
Async.StartWithContinuations(
async {
do! Async.SwitchToThreadPool()
return! c
},
saveSuccess,
saveError,
(fun _ -> decrement() |> ignore),
cancellationToken = cts.Token
)
)
async {
let cancelled = ref 0
let cts = new CancellationTokenSource()
use! d = Async.OnCancel(fun() -> cancel cancelled cts |> ignore)
let! r = createComputation cancelled cts
return r
}

Notes:

  1. cancel function can be called multiple times. First time leads to triggering cancellation on specified CancellationTokenSource,subsequent calls are ignored. When success continuation (saveSuccess) is invoked for the first time - it signals cancellation and delegates actual continuation.
  2. decrement function counts down when every computation ends (independently from completion type). Finally it disposes associated CancellationTokenSource.
  3. Async.OnCancel propagates externally signalled cancellation to nested computations.
 
GeekySpeaky: Submit Your Site!