вторник, 16 февраля 2010 г.

Event-based async pattern in F#

Hi everyone, finally I made up my mind and decided to start blogging:) .

I've played with F# for a long time and would like to say, that I'm very impressed by its power and expressiveness. Many concepts from the language itself and from the standard library provide quick, short and elegant solution for a number of non-trivial problems.

One of such problems is running asynchronous operations. Strictly speaking, execution itself is not a big issue, in a most common case you can just enqueue task in a thread from the threadpool or spawn the worker thread manually. The complexity reveals itself when you need to coordinate execution of many operations, for example running one async operation after another or create a number of child tasks within operation and resume only when all of them are completed. .NET BCL provides two common patterns to deal with asynchronous operations:

1. IAsyncResult result + Begin/End methods

2. Event-based async pattern.

Both of them have similar features: they are awkward when coordination is touched. For IAsyncResult pattern it is possible to utilize callbacks to chain execution of subsequent operations and properties/methods of IAsyncResult object to check execution status, but code looks become unreadable mess. Anonymous methods from C# 3.0 helps a lot but even with them program that sequentially execute number of async operation begins to look like a “ladder” made of lambdas. Event-based async pattern have no coordination support at all.

F# async workflows come to the rescue.

F# standard library contains ultimate weapon to solve problems with async operations: asynchronous workflows. They allow writing code that looks like sequential but is asynchronous under the hood.

   1: let readToEnd (reader : #TextReader) = async {   
   2:     do! Async.SwitchToThreadPool()
   3:     return reader.ReadToEnd()
   4:     }
   5:  
   6: let loadPageAsync (url : string) = async {
   7:     let request = WebRequest.Create(url) :?> HttpWebRequest
   8:     let! response = request.AsyncGetResponse()
   9:     use stream = response.GetResponseStream()
   10:     use reader = new StreamReader(stream)
  11:     let! content = readToEnd reader
  12:     return content
  13:     }

Workflow description is sliced into pieces and each piece can be executed asynchronously one after another.

sliced_workflow

Most features of creating async workflows and using Async module shall be covered in subsequent posts, for now I’ll speak about integration with existing patterns of asynchronous execution.

Workflows can be created based on Begin/End methods via calling Async.FromBeginEnd

   1: use stream = response.GetResponseStream()
   2: let buf = Array.zeroCreate 8192
   3: let! read = Async.FromBeginEnd(buf, 0, buf.Length, stream.BeginRead, stream.EndRead)

Also workflow functionality can be exposed to other .NET languages as a pair of Begin/End methods

   1: type PageDownloader() = 
   2:     let beginMethod, endMethod, cancelMethod = Async.AsBeginEnd(loadPageAsync)
   3:     member this.Begin = beginMethod
   4:     member this.End = endMethod
   5:     member this.Cancel = cancelMethod

Unfortunately Async workflows doesn’t support event-based async pattern but Async module has method AwaitEvent that can be used. 

   1: type OperationCompletedEventArgs(r : string) = 
   2:     inherit System.EventArgs()
   3:     member this.Result = r
   4:  
   5: type Class() = 
   6:     let completed = new Event<System.EventHandler<OperationCompletedEventArgs>, _>()
   7:     member this.RunOperationAsync() = 
   8:         // do some job
   9:         completed.Trigger(box this, new OperationCompletedEventArgs("done"))
  10:         ()
  11:     [<CLIEvent>]
  12:     member this.OperationCompleted = completed.Publish
  13:  
  14: let runJob (c : Class) = async {
  15:     do c.RunOperationAsync()
  16:     let! res = Async.AwaitEvent(c.OperationCompleted)
  17:     return res.Result
  18:     }
  19:  

Do not use this code, it it buggy!!!

Async.AwaitEvent method is blocking so we cannot call it before RunOperationAsync. If we simply change order (like in the sample) this will lead to potential race condition - operation can be finished before Async.AwaitEvent subscribes to OperationCompleted event and AwaitEvent will wait till the end of times. To solve this we need something similar to Subject in Rx: something that can subscribe to OperationCompletedEvent before RunOperationAsync and reraise event if situation described above occurred. Let’s name it Subject too.

   1: // sketch of implementation
   2: open System
   3:  
   4: [<AutoOpen>]
   5: module AsyncExtensions =
   6:     type private DelegateAdapter<'T>(o : IObserver<_>) = 
   7:         static let invokeMethod = typeof<DelegateAdapter<'T>>.GetMethod("Invoke") 
   8:         static let argumentTypes = invokeMethod.GetParameters().[1..] |> Array.map (fun p -> p.ParameterType)
   9:         
  10:         static member Method = invokeMethod
  11:         static member ArgumentTypes = argumentTypes
  12:  
  13:         member this.Invoke(args : 'T) = o.OnNext(args)
  14:  
  15:     type Async with
  16:         static member GetSubject(event : IEvent<'D, 'T>) = 
  17:             let value = ref None
  18:             let handlers : Delegate ref = ref null
  19:  
  20:             let sync = new obj()
  21:  
  22:             let trigger () = 
  23:                 match !handlers, !value with
  24:                 | l, Some(v) -> 
  25:                     match DelegateAdapter<_>.ArgumentTypes.Length with
  26:                     | 1 -> l.DynamicInvoke([| null; box v |]) |> ignore
  27:                     | _ -> l.DynamicInvoke(Array.append [|null|] (Microsoft.FSharp.Reflection.FSharpValue.GetTupleFields(box v))) |> ignore
  28:                 | _ -> ()
  29:  
  30:             let rec dispose = event.Subscribe(handler)
  31:             and handler v = lock sync (fun () ->
  32:                 value := Some(v)
  33:                 trigger()
  34:                 dispose.Dispose()
  35:             )
  36:  
  37:             let addHandler h = lock sync (fun () -> 
  38:                 handlers := h; 
  39:                 trigger() 
  40:                 )
  41:             let removeHandler h = lock sync (fun () -> handlers := null)
  42:  
  43:             { new IEvent<'D, 'T> 
  44:               interface IDelegateEvent<'D> with
  45:                     member this.AddHandler(h) = addHandler h
  46:                     member this.RemoveHandler(h) = removeHandler h
  47:               interface IObservable<'T> with
  48:                     member this.Subscribe(s) = 
  49:                         let adapter = new DelegateAdapter<_>(s)
  50:                         let handler = Delegate.CreateDelegate(typeof<'D>, adapter, DelegateAdapter<'T>.Method)
  51:                         addHandler handler
  52:                         { new IDisposable with
  53:                             member d.Dispose() = removeHandler handler } }

Subject subscribes to original event and in simple case just delegates call to inner handler (AwaitEvent subsriber). If event is raised when AwaitEvent handler is not attached yet, then result is stored in field and AwaitEvent handler receives it in moment of subscription.

   1: let runJob2 (c : Class) = async {
   2:     let subject = Async.GetSubject(c.OperationCompleted) // s
   3:     do c.RunOperationAsync()
   4:     let! res = Async.AwaitEvent(subject)
   5:     return res.Result
   6:     }

1 комментарий:

 
GeekySpeaky: Submit Your Site!