среда, 24 февраля 2010 г.

Adapter for event-based pattern. With a blackjack and ...other components

Recently during overview of Async module I’ve promised to make an adapter for async workflows, so they can be consumed as implementations of a good old event-based async pattern. Let's start.

Define inheritor from AsyncCompletedEventArgs class (with a few helper functions – we will need them later). It will be used as a container for operation result.

type OperationCompletedEventArgs<'T>(result : 'T, ex, cancelled) = 
inherit AsyncCompletedEventArgs(ex, cancelled, null)
member this.Result = result

let raiseSuccess f (evt : Event<_, _>) r = evt.Trigger(null, f r null false)
let raiseError f (evt : Event<_, _>) ex = evt.Trigger(null, f (Unchecked.defaultof<_>) ex false)
let raiseCancel f (evt : Event<_, _>) _ = evt.Trigger(null, f (Unchecked.defaultof<_>) null true)

let createResult res ex cancelled = new OperationCompletedEventArgs<_>(res, ex, cancelled)
let createUnit () ex cancelled = new AsyncCompletedEventArgs(ex, cancelled, null)

Declare two wrapper interfaces: first interface will represent void operation, second - operation that can return a result.

type IOperation<'TArg> = 
abstract Completed : IEvent<EventHandler<AsyncCompletedEventArgs>, _>
abstract RunAsync : 'TArg -> unit
abstract CancelAsync : unit -> unit

type IOperation<'TArg, 'TResult> =
abstract Completed : IEvent<EventHandler<OperationCompletedEventArgs<'TResult>>, _>
abstract RunAsync : 'TArg -> unit
abstract CancelAsync : unit -> unit

These operations will behave as prescribed in article:Implementing the Event-based Asynchronous Pattern:

  • Completed event is raised when async execution is finished, cancelled or failed
  • CancelAsync method is used to signal a cancel for running operation
  • RunAsync method starts async operation.

Single argument ('TArg) in RunAsync method doesn't mean that we will be able to adapt only one-argument computations. Sometimes one generic argument is enough especially if type parameter can be substituted by tuple of any rank :).

Finally, implementation itself:

type Async with
static member AsEventBased(computation) =
let cts = new CancellationTokenSource()
let event = new Event<_, _>()
{ new IOperation<_, _> with
member this.Completed = event.Publish
member this.RunAsync(arg) =
computation arg,
raiseSuccess createResult event,
raiseError createResult event,
raiseCancel createResult event,
member this.CancelAsync() = cts.Cancel() }

static member AsEventBased(computation) =
let cts = new CancellationTokenSource()
let event = new Event<_, _>()
{ new IOperation<_> with
member this.Completed = event.Publish
member this.RunAsync(arg) =
computation arg,
raiseSuccess createUnit event,
raiseError createUnit event,
raiseCancel createUnit event,
member this.CancelAsync() = cts.Cancel() }

Well, adapter is ready but I need to ensure, that it actually works. Everybody are sick and tired of parallel download demonstration, so I've decided to make something new: async computation that makes requests to Google translate through AJAX API. So said, so done.

1. Google Translate returns data as JSON and BCL already includes DataContractJsonSerializer. Define data contracts:

open System.IO
open System.Web
open System.Net

open System.Runtime.Serialization
open System.Runtime.Serialization.Json

type ResponseData = {
[<DataMember(Name = "translatedText")>]
mutable TranslatedText : string
[<DataMember(Name = "detectedSourceLanguage")>]
mutable DetectedSourceLanguage : string
type Response = {
[<DataMember(Name = "responseData")>]
mutable ResponseData : ResponseData
[<DataMember(Name = "responseDetails")>]
mutable Details : string
[<DataMember(Name = "responseStatus")>]
mutable Status : int

2. Create a workflow:

let translate (referer, word : string, sourceLang, targetLangs) =
let translateOne lang = async {
let s = new DataContractJsonSerializer(typeof<Response>)

let requestString =
sprintf "http://ajax.googleapis.com/ajax/services/language/translate?v=1.0&q=%s&langpair=%s%%7C%s" (HttpUtility.UrlEncode word) sourceLang lang
let req = HttpWebRequest.Create(requestString) :?> HttpWebRequest
req.Referer <- referer
req.Timeout <- 4000
let! resp = req.AsyncGetResponse()
use stream = resp.GetResponseStream()
return s.ReadObject(stream) :?> Response
|> Seq.map translateOne
|> Async.Parallel

Remarks: Evidently sending request is trivial, arguments are encoded in url. If empty string is passed in first position of langpair argument then Translate service will try to autodetect source language. Also presense of correct Referer header is mandatory (prescribed by Google Terms of Service).

3. Declare a helper module with language-related settings

module Languages 

let Unknown = "UNKNOWN"

type Lang = {
Name : string
Id : string
member this.IsAutodetect = this.Id = ""

let Autodetect = { Name = "AUTODETECT"; Id = ""}

let All =
{Name="BELARUSIAN" ; Id = "be"}
{Name="BENGALI" ; Id = "bn"}
{Name="GUJARATI" ; Id = "gu"}
{Name="HEBREW" ; Id = "iw"}
{Name="HINDI" ; Id = "hi"}
{Name="HUNGARIAN" ; Id = "hu"}
{Name="ICELANDIC" ; Id = "is"}
{Name="INDONESIAN" ; Id = "id"}
{Name="INUKTITUT" ; Id = "iu"}
{Name="IRISH" ; Id = "ga"}
{Name="ITALIAN" ; Id = "it"}
{Name="JAPANESE" ; Id = "ja"}
{Name="KANNADA" ; Id = "kn"}
{Name="KAZAKH" ; Id = "kk"}
{Name="KHMER" ; Id = "km"}
{Name="PERSIAN" ; Id = "fa"}
{Name="POLISH" ; Id = "pl"}
{Name="PORTUGUESE" ; Id = "pt-PT"}
{Name="PUNJABI" ; Id = "pa"}
{Name="ROMANIAN" ; Id = "ro"}
{Name="RUSSIAN" ; Id = "ru"}
{Name="SANSKRIT" ; Id = "sa"}
let private lookup = All |> Seq.map(fun x -> (x.Id, x.Name)) |> dict
let Find n =
match lookup.TryGetValue n with
| true, v -> v
| false, _ -> Unknown

I've intentionally trimmed it because of size, full list of supported languages can be found here.

4. Create a client application(WPF based). I've cheated a bit and made F# client but pretended that I've never heart about Async module. All interaction with real workflow implementation will be performed through adapter. In fact I've cheated twice, instead of crafting UI object by object I've defined structure in XAML and load it with XamlReader.

type LanguageListItem = {
mutable Checked : bool
Language : Languages.Lang

let referer = "put_your_referer here"

let win = load "MainForm.xaml" :?> Window
let input = win.FindName("input") :?> TextBox
let translateBtn = win.FindName("translate") :?> Button
let langsList = win.FindName("languages") :?> ItemsControl
let sourceLangCombo = win.FindName("sourceLanguages") :?> ComboBox

let languages = Languages.All |> List.map (fun l -> {Language = l})
langsList.ItemsSource <- languages

sourceLangCombo.ItemsSource <- [Languages.Autodetect] @ Languages.All
sourceLangCombo.SelectedIndex <- 0

translateBtn.Click.Add(fun _ ->
let phrase = input.Text
if not <|String.IsNullOrWhiteSpace(phrase) then
let checkedLangs = languages |> List.choose(fun l -> if l.Checked then Some(l.Language) else None)
let sourceLang = sourceLangCombo.SelectedItem :?> Languages.Lang
let op = Async.AsEventBased(translate)

let rec disposable = op.Completed.Subscribe(handler)
and handler args =
if args.Error <> null then
MessageBox.Show(args.Error.Message) |> ignore
else if not <| args.Cancelled then
showResults phrase sourceLang win (args.Result |> Seq.zip checkedLangs |> List.ofSeq)

op.RunAsync(referer, phrase, sourceLang.Id, checkedLangs |> Seq.map(fun l -> l.Id))

let app = new Application()
app.Run(win) |> ignore

showResults function accepts source text, source language, and results and displays all information on new window (Owner = parent win). Its signature: let showResults text (sourceLang : Languages.Lang) parentWin (results : list). Implementation is similar to main function and thus omitted.

Xaml of main window (UI design is not strong point of mine :) )

<Window xmlns="http://schemas.microsoft.com/winfx/2006/xaml/presentation"
Height="350" Width="450">
<ResourceDictionary Source="/PresentationFramework.Luna, Version=, Culture=neutral, PublicKeyToken=31bf3856ad364e35, ProcessorArchitecture=MSIL;component/themes/luna.metallic.xaml" />
<RowDefinition MaxHeight="25"/>
<RowDefinition />
<RowDefinition MaxHeight="25" />
<ColumnDefinition MaxWidth="120"/>
<TextBox Height="23" x:Name="input" Grid.Row="0"/>
<ComboBox x:Name="sourceLanguages" Grid.Row="0" Grid.Column="1">
<TextBlock Text="{Binding Path=Name}"/>
<ScrollViewer Grid.Row="1" Grid.ColumnSpan="2">
<ItemsControl x:Name="languages" >
<Border BorderThickness="1" BorderBrush="Black">
<CheckBox Margin="5,2,5,2" Content="{Binding Path=Language.Name}" x:Name="check" IsChecked="{Binding Path=Checked}"/>
<Button Content="Translate" Grid.Row="2" Grid.ColumnSpan="2" x:Name="translate" Margin="0,2,0,2"/>
<Image Grid.Column="1" Grid.Row="1" Grid.RowSpan="2" Height="150" HorizontalAlignment="Left" Margin="252,244,0,0" Stretch="Fill" VerticalAlignment="Top" Width="200" />

Main window




воскресенье, 21 февраля 2010 г.

Extending Unity configuration.

For the first time I’ve used DI container 5 years ago (it was Spring for Java). During the first experience I was greatly impressed with the required style of design: component needs to do only what outer world expect from him. All other things, like creating or locating dependencies should be put outside the component. This is quite obvious but abidance of these simple rules brings a number of valuable benefits:

  • If component does only what it is responsible to do then the code decreases in size and become easier to understand
  • All dependencies of component can be determined from its public interface (through properties, constructor parameters etc)
  • Level of potential reusability increases due to low coupling
  • Testability increases especially with application of tools like RhinoMocks and Moq

Unity is a simple and lightweight DI container for .NET. It allows defining interconnections between components both in XML and using API of IUnityContainer. In this post I’d like to show the idea how configuration mechanism can be extended using possibilities of C# 3.0.

Sample classes and interfaces:
public interface IMyComponent
void Run();

public interface ILogger
void LogInfo(string format, params object[] args);

public class MyComponent : IMyComponent
public string Text { get; set; }
public ILogger Logger { get; set; }
public void Run()

public class ConsoleLogger : ILogger
public void LogInfo(string format, params object[] args)
Console.WriteLine(format, args);

As we can see MyComponent class is decoupled from concrete implementation of ILogger so is can be tested with ILogger stub, reused with other type of logger and so on. Let’s create a code that will wire up all these components together.

1. Create UnityContainer and register all types in it

var container = new UnityContainer()
.RegisterType<IMyComponent, MyComponent>(
new InjectionMember[]
new InjectionProperty("Logger", new ResolvedParameter(typeof (ILogger),"Logger")),
new InjectionProperty("Text", "DataSource=...")
.RegisterType<ILogger, ConsoleLogger>("Logger");

For this sample I've used beta release of Unity – Feb 2010, in the older versions configuration of injected members is available only through calling ConfigureInjectionFor with InjectedMembers extension.

2. Instantiate MyComponent and run operation

var myComponent = container.Resolve<IMyComponent>();

Line "DataSource=…" should appear in console.

Everything seems to be fine, but… IMHO this version of code has few flaws. First: property is referred via string name and this is very error prone. One occasional rename operation and BOOM!!! Second inconvenience is that property type should be set explicitly. This is annoying and unsafe approach, developer can change type of property but DI container will still try to inject dependent object relying on the old type. Let's add that both of mentioned errors are invisible to compiler and reveal itself only in runtime.

Time to make some improvements. C# 3.0 already provides ways to refer type members in type-safe way via using expression trees. We can apply them both to define target property and to get its type. This version will be much more resistant to errors because of compiler control.

In the beginning - some auxiliary types

/// <summary>
/// Accumulator of type-related settings
/// </summary>
/// <typeparam name="T"></typeparam>
public interface ITypeConfigurator<T>
ITypeConfigurator<T> SetName(string name);
ITypeConfigurator<T> SetLifetimeManager(LifetimeManager lifetimeManager);
ITypeConfigurator<T> SetResolvedProperty<TRes>(Expression<Func<T, TRes>> expr);
ITypeConfigurator<T> SetResolvedProperty<TRes>(Expression<Func<T, TRes>> expr, string objectName);
ITypeConfigurator<T> SetValueProperty<TRes>(Expression<Func<T, TRes>> expr, TRes value);

public class TypeConfigurator<T> : ITypeConfigurator<T>
public TypeConfigurator()
InjectionMembers = new List<InjectionMember>();
LifetimeManager = new ContainerControlledLifetimeManager(); // default value

public string Name { get; private set; }
public LifetimeManager LifetimeManager { get; private set; }
public List<InjectionMember> InjectionMembers { get; private set; }

ITypeConfigurator<T> ITypeConfigurator<T>.SetName(string name)
Name = name;
return this;

ITypeConfigurator<T> ITypeConfigurator<T>.SetLifetimeManager(LifetimeManager lifetimeManager)
LifetimeManager = lifetimeManager;
return this;

ITypeConfigurator<T> ITypeConfigurator<T>.SetResolvedProperty<TRes>(Expression<Func<T, TRes>> expr)
return AddResolvedProperty(expr, null);

ITypeConfigurator<T> ITypeConfigurator<T>.SetResolvedProperty<TRes>(Expression<Func<T, TRes>> expr, string objectName)
return AddResolvedProperty(expr, objectName);

private ITypeConfigurator<T> AddResolvedProperty<TRes>(Expression<Func<T, TRes>> expr, string objectName)
var property = GetPropertyInfo(expr);
var propertyValue = string.IsNullOrEmpty(objectName)
? new ResolvedParameter<TRes>()
: new ResolvedParameter<TRes>(objectName);
InjectionMembers.Add(new InjectionProperty(property.Name, propertyValue));
return this;

ITypeConfigurator<T> ITypeConfigurator<T>.SetValueProperty<TRes>(Expression<Func<T, TRes>> expr, TRes value)
var property = GetPropertyInfo(expr);
InjectionMembers.Add(new InjectionProperty(property.Name, value));
return this;

private static PropertyInfo GetPropertyInfo(LambdaExpression expr)
var memberExpression = expr.Body as MemberExpression;
if (memberExpression == null)
throw new ArgumentException("Simple member expression expected");

var propertyInfo = memberExpression.Member as PropertyInfo;
if (propertyInfo == null)
throw new ArgumentException("Property access expression expected");
return propertyInfo;

Entry point to improved configuration mechanism can be defined as an extension method for IUnityContainer interface.

public static class UnityContainerExtensions
/// <summary>
/// Gathers type information and applies it to provided container.
/// </summary>
/// <remarks>
/// I have used delegate <see cref="configurator"/> in interface for simplification
/// because otherwise I need to create bunch of overloads
/// RegisterType(name, SetLifetimeManager...)
/// RegisterType(name, ...)
/// RegisterType(...)
/// RegisterType(SetLifetimeManager)
/// In current version in case of need it is possible to set SetLifetimeManager and SetName with <see cref="ITypeConfigurator{T}.SetName"/>
/// or <see cref="ITypeConfigurator{T}.SetLifetimeManager"/> methods.
/// </remarks>
public static IUnityContainer RegisterType<TFrom, TTo>(
this IUnityContainer container,
Action<ITypeConfigurator<TTo>> configurator
) where TTo : TFrom
var typeConfigurator = new TypeConfigurator<TTo>();
// configurator will accumulate all settings defined by user

// collected settings are applied to actual container
return container.RegisterType<TFrom, TTo>(

And finally, advanced configurator in action:

var container = new UnityContainer()
.RegisterType<IMyComponent, MyComponent>(
c =>
.SetResolvedProperty(_ => _.Logger, "Logger")
.SetValueProperty(_ => _.Text, "DataSource=..")
.RegisterType<ILogger, ConsoleLogger>("Logger");

var myComponent = container.Resolve<IMyComponent>();

Summary: unnecesary stuff removed, error-resistance: +10, additional skills: compile-time checks over property names, automatic inference of property types

суббота, 20 февраля 2010 г.

Overview of F# Async module

This post I’d like to dedicate to reviewing functionality of Async module – creating and manipulating async computations.

All functions can be divided into the following groups:

Auxiliary functions used in review

All code in the post was tested in FSI. Disclaimer: samples are pure-synthetic, all coincidences with real-life problems are accidental.

Functions, that run computations

"async" builder just describes asynchronous computations without running it so somebody has to do the first step and start execution. Async module provides several functions that accept definition of async workflow and run it.

  • Async.Start – starts execution of the workflow using thread from thread pool
  • Async.StartImmediate – starts execution of the given workflow in a current thread
  • Async.RunSynchronously – starts execution of the given workflow and waits its completion
  • Async.StartWithContinuations – - starts execution of the given workflow and after its completion passes result/exception/OperationCancelledException to one of specified functions. If thread, that initiates executes has SynchronizationContext installed, then final continuations will use this SynchronizationContext for posting results
  • Async.StartAsTask – starts execution of the given workflow and returns System.Threading.Tasks.Task object that can be used to monitor execution status and receive result. Task should be treated in the same way as a task obtained from TaskCompletionSource.Task property – it is not execute the action itself and can be used for checking status and getting result. This function can be used to for interoperability between F# and other languages

Receiving results

Async.Start and Async.StartImmediate execute computations asynchronously so computation process should itself define ways for communication and returning final result. Async.RunSynchronously waits till workflow is finished and returns its result. Async.StartWithContinuations takes continuation for result as one of arguments. When using Async.StartAsTask, result can be taken in many ways (i.e. Task.ContinueWith). For more information please refer official TPL documentation.

Code samples

up to menu

Functions, that construct computations

Computation can be defined not only with async {} builder syntax. It is also possible to create them with the two functions of Async module.
  • Async.FromContinuations – this function accepts another function f with a tuple of 3 continuations as argument: success continuation, error continuation and cancel continuation and returns computation with f as a body. F can contain arbitrary code, but result/error and cancellation should be reported through appropriate continuations. If no continuations were invoked –then this shall be the final step of the workflow
  • Async.FromBeginEnd – Begin/End+IAsyncResult is a common pattern for running asynchronous operations in .NET. FromBeginEnd acts as an adapter for async workflow interface by wrapping provided Begin/End method. Thus it allows using large number of existing components that supports asynchronous mode of work

Code samples

up to menu

Cancellation-related functions

Possibility to cancel task or a sequence of tasks is very important when dealing with asynchronous operations. Async module use the same approach as TPL – usage of CancellationTokenSource/CancellationToken. Async module contains default instance of CancellationToken that all executing opetations share by default, but all operations from “running computation” category accept personal CancellationToken as an optional parameter. Using combination of CancellationTokenSource/CancellationToken is widely covered in the ParallelExtension samples. Library code performs intermediate cancel request checks so developer doesn’t need to do the same in workflow definition. Other functions used in workflow may accept cancellation token as an argument and do checks explicitly. Cancellation related API in Async module provides access to default cancellation token and to token assigned to workflow.

  • Async.DefaultCancellationToken - returns default cancellation token
  • Async.CancelDefaultToken - signals cancellation on default token. Usually used outside the workflow
  • Async.CancellationToken - get the async computation that returns cancellation token for the current workflow. This function is usually used inside workflow to run operations that should be cancelled simultaneously with the workflow
  • Async.OnCancel - gets the computation that installs a cancellation handler.Handler will be executed if cancellation request is signaled. OnCancel returns Async<IDisposable> so common usage is use! _ = Async.OnCancel(..) or let! handler = Async.OnCancel(…) ... handler.Dispose()

Code samples

up to menu

Creating adapters for interoperability with other languages

Often it is very useful to consume workflow functionality outside F#. Async.AsBeginEnd gives possibility to expose workflow as a triple of methods (Begin/End/Cancel) so it can be easily used from other .NET languages. In one of my next posts I’ll show how to extend Async with method that creates event-based adapter for workflow.

Code samples

up to menu

Coordination functions

Async module function that are essential for tasks coordination

  • Async.AwaitEvent - creates a computation that subscribes on given event and resumes execution when event is raised. Returns instance derived from EventArgs
  • Async.AwaitIAsyncResult - creates a computation that waits on provided IAsyncResult. Returns true if IAsyncResult issues a signal within given timeout. Timeout parameter is optional, default value is -1 of Timeout.Infinite
  • Async.AwaitWaitHandle - creates computation that waits on provided WaitHandle. Returns true if handle issues a signal within given timeout. Timeout parameter is optional, default value is -1 of Timeout.Infinite
  • Async.AwaitTask - creates computation that waits on provided task and returns its result

One method is a bit special here - Async.Sleep.It doesn’t wait on anything, instead it allows pausing execution and resuming it on timeout. This operation will not block any operation system threads, but use System.Threading.Timer for scheduling

Code samples

up to menu


This term is arguable, and most likely it will be changed :) With it I'd like to define functions that manipulate over one or many async computations and produce new enhanced computation.

  • Async.Parallel - – takes a sequence of async computation and returns workflow, that will execute source computation in a fork-join way. Results array of results
  • Async.Catch - takes a computation and returns workflow that intercepts all exceptions within source computation. Result – Choice<’T, exn> it can be extracted via pattern matching
  • Async.Ignore - takes a computation and returns workflow that executes source computation, ignores its result and returns unit
  • Async.TryCancelled - takes a computation and a compensating action. Returns workflow that runs source computation. If execution is cancelled before completion, then proceed with compensating action

Code samples

up to menu

Functions that start subcomputation

Async workflow may start subworkflows that are executed simultaneously with the parent.

  • Async.StartChild - produces a computation that starts a given computation and returns computation for receiving result (yes, Async<Async<…>><…> :) ). If parent computation requests the result and child computation is not finished yet – parent computation is suspended until child completion
  • Async.StartChildAsTask - produces a computation that starts a given computation and returns Task instance for monitoring status and receiving results

Code samples

up to menu

Thread selection functions

Sometimes it is necessary to specify explicitly what kind of thread shall be used for the next step of computations.

  • Async.SwitchToThreadPool - creates a computation that executes its continuation in that thread from thread pool
  • Async.SwitchToNewThread - creates a computation that executes its continuation in the new thread
  • Async.SwitchToContext - creates a computation that with post its continuation to the given syncContext

Code samples

up to menu

среда, 17 февраля 2010 г.

Co/Contravariance for .NET 2.0+

CLR supports generic variance for generic interfaces and delegates since version 2.0.

ECMA 335:

The CLI supports covariance and contravariance of generic parameters, but only in the signatures of interfaces and delegate classes. Suppose we have a generic interface, which is covariant in its one generic parameter; e.g., IA`1<+T>. Then all instantiations satisfy IA`1<GenArgB> := IA`1<GenArgA>, so long as GenArgB := GenArgA using the notion from assignment compatibility. So, for example, an instance of type IA`1<string> can be assigned to a local of type type IA`1<object>.

Generic contravariance operates in the opposite sense: supposing that we have a contravariant interface IB`1<-T>, then IB`1<GenArgB> := IB`1<GenArgA>, so long as GenArgA := GenArgB.

in spite of this, co/contravariance features  were only partially exposed for the users of mainstream languages. C# 2.0 add support only for covariance and contravariance in delegates.  C# 4.0 makes one step further: both interfaces and delegates are supported and now it is possible to annotate generic parameter to be either covariant (out) or contravariant (in) and compiler will verify that all usages of this parameter are correct corresponding to the annotation.

For example:

public interface IMyComparer<in T>
// contravariant generic parameter
int Compare(T a, T b);

public interface IFactory<out T>
// covariant generic parameter
T Create();

public class MyComparer<T> : IMyComparer<T>
public int Compare(T a, T b)
return 0;

public class Factory<T> : IFactory<T>
public T Create()
return default(T);

// usage

IMyComparer<string> s = new MyComparer<object>();
IFactory<object> p = new Factory<string>();

Very nice feature! And it doesn’t require .NET 4.0 for execution, runtime already supports variance and C# compiler makes all remaining job.

Note:, all standard interfaces (IEnumerable<T>, IEnumerator<T>, IQueryable<T>, IGrouping<TK, TV>, IEqualityComparer<T>, IComparer<T>, IComparable<T> ) and delegates (Action<>, Func<T>, Predicate<T>, Comparison<T>, Converter<TIn, TOut>) shall not be accessible, since annotated versions are located in 4.0 assemblies.

Other compiler features that are accessible from VS 2010 for .NET 2.0+

  • optional parameters
  • named parameters
class Program
static void Main(string[] args)
Run(b : "value", a : 10);

public static void Run(int a, string b)
Console.WriteLine("a = {0}, b = {1}", a, b);

вторник, 16 февраля 2010 г.

Event-based async pattern in F#

Hi everyone, finally I made up my mind and decided to start blogging:) .

I've played with F# for a long time and would like to say, that I'm very impressed by its power and expressiveness. Many concepts from the language itself and from the standard library provide quick, short and elegant solution for a number of non-trivial problems.

One of such problems is running asynchronous operations. Strictly speaking, execution itself is not a big issue, in a most common case you can just enqueue task in a thread from the threadpool or spawn the worker thread manually. The complexity reveals itself when you need to coordinate execution of many operations, for example running one async operation after another or create a number of child tasks within operation and resume only when all of them are completed. .NET BCL provides two common patterns to deal with asynchronous operations:

1. IAsyncResult result + Begin/End methods

2. Event-based async pattern.

Both of them have similar features: they are awkward when coordination is touched. For IAsyncResult pattern it is possible to utilize callbacks to chain execution of subsequent operations and properties/methods of IAsyncResult object to check execution status, but code looks become unreadable mess. Anonymous methods from C# 3.0 helps a lot but even with them program that sequentially execute number of async operation begins to look like a “ladder” made of lambdas. Event-based async pattern have no coordination support at all.

F# async workflows come to the rescue.

F# standard library contains ultimate weapon to solve problems with async operations: asynchronous workflows. They allow writing code that looks like sequential but is asynchronous under the hood.

   1: let readToEnd (reader : #TextReader) = async {   
   2:     do! Async.SwitchToThreadPool()
   3:     return reader.ReadToEnd()
   4:     }
   6: let loadPageAsync (url : string) = async {
   7:     let request = WebRequest.Create(url) :?> HttpWebRequest
   8:     let! response = request.AsyncGetResponse()
   9:     use stream = response.GetResponseStream()
   10:     use reader = new StreamReader(stream)
  11:     let! content = readToEnd reader
  12:     return content
  13:     }

Workflow description is sliced into pieces and each piece can be executed asynchronously one after another.


Most features of creating async workflows and using Async module shall be covered in subsequent posts, for now I’ll speak about integration with existing patterns of asynchronous execution.

Workflows can be created based on Begin/End methods via calling Async.FromBeginEnd

   1: use stream = response.GetResponseStream()
   2: let buf = Array.zeroCreate 8192
   3: let! read = Async.FromBeginEnd(buf, 0, buf.Length, stream.BeginRead, stream.EndRead)

Also workflow functionality can be exposed to other .NET languages as a pair of Begin/End methods

   1: type PageDownloader() = 
   2:     let beginMethod, endMethod, cancelMethod = Async.AsBeginEnd(loadPageAsync)
   3:     member this.Begin = beginMethod
   4:     member this.End = endMethod
   5:     member this.Cancel = cancelMethod

Unfortunately Async workflows doesn’t support event-based async pattern but Async module has method AwaitEvent that can be used. 

   1: type OperationCompletedEventArgs(r : string) = 
   2:     inherit System.EventArgs()
   3:     member this.Result = r
   5: type Class() = 
   6:     let completed = new Event<System.EventHandler<OperationCompletedEventArgs>, _>()
   7:     member this.RunOperationAsync() = 
   8:         // do some job
   9:         completed.Trigger(box this, new OperationCompletedEventArgs("done"))
  10:         ()
  11:     [<CLIEvent>]
  12:     member this.OperationCompleted = completed.Publish
  14: let runJob (c : Class) = async {
  15:     do c.RunOperationAsync()
  16:     let! res = Async.AwaitEvent(c.OperationCompleted)
  17:     return res.Result
  18:     }

Do not use this code, it it buggy!!!

Async.AwaitEvent method is blocking so we cannot call it before RunOperationAsync. If we simply change order (like in the sample) this will lead to potential race condition - operation can be finished before Async.AwaitEvent subscribes to OperationCompleted event and AwaitEvent will wait till the end of times. To solve this we need something similar to Subject in Rx: something that can subscribe to OperationCompletedEvent before RunOperationAsync and reraise event if situation described above occurred. Let’s name it Subject too.

   1: // sketch of implementation
   2: open System
   4: [<AutoOpen>]
   5: module AsyncExtensions =
   6:     type private DelegateAdapter<'T>(o : IObserver<_>) = 
   7:         static let invokeMethod = typeof<DelegateAdapter<'T>>.GetMethod("Invoke") 
   8:         static let argumentTypes = invokeMethod.GetParameters().[1..] |> Array.map (fun p -> p.ParameterType)
  10:         static member Method = invokeMethod
  11:         static member ArgumentTypes = argumentTypes
  13:         member this.Invoke(args : 'T) = o.OnNext(args)
  15:     type Async with
  16:         static member GetSubject(event : IEvent<'D, 'T>) = 
  17:             let value = ref None
  18:             let handlers : Delegate ref = ref null
  20:             let sync = new obj()
  22:             let trigger () = 
  23:                 match !handlers, !value with
  24:                 | l, Some(v) -> 
  25:                     match DelegateAdapter<_>.ArgumentTypes.Length with
  26:                     | 1 -> l.DynamicInvoke([| null; box v |]) |> ignore
  27:                     | _ -> l.DynamicInvoke(Array.append [|null|] (Microsoft.FSharp.Reflection.FSharpValue.GetTupleFields(box v))) |> ignore
  28:                 | _ -> ()
  30:             let rec dispose = event.Subscribe(handler)
  31:             and handler v = lock sync (fun () ->
  32:                 value := Some(v)
  33:                 trigger()
  34:                 dispose.Dispose()
  35:             )
  37:             let addHandler h = lock sync (fun () -> 
  38:                 handlers := h; 
  39:                 trigger() 
  40:                 )
  41:             let removeHandler h = lock sync (fun () -> handlers := null)
  43:             { new IEvent<'D, 'T> 
  44:               interface IDelegateEvent<'D> with
  45:                     member this.AddHandler(h) = addHandler h
  46:                     member this.RemoveHandler(h) = removeHandler h
  47:               interface IObservable<'T> with
  48:                     member this.Subscribe(s) = 
  49:                         let adapter = new DelegateAdapter<_>(s)
  50:                         let handler = Delegate.CreateDelegate(typeof<'D>, adapter, DelegateAdapter<'T>.Method)
  51:                         addHandler handler
  52:                         { new IDisposable with
  53:                             member d.Dispose() = removeHandler handler } }

Subject subscribes to original event and in simple case just delegates call to inner handler (AwaitEvent subsriber). If event is raised when AwaitEvent handler is not attached yet, then result is stored in field and AwaitEvent handler receives it in moment of subscription.

   1: let runJob2 (c : Class) = async {
   2:     let subject = Async.GetSubject(c.OperationCompleted) // s
   3:     do c.RunOperationAsync()
   4:     let! res = Async.AwaitEvent(subject)
   5:     return res.Result
   6:     }
GeekySpeaky: Submit Your Site!