пятница, 2 апреля 2010 г.

Evoking by "Patterns of Parallel Programming" 1.1

In previous post I've made the sketch of Async.Any method, however code in sample should not be treated as a real-world implementation but rather as a demonstration of approach. Now we will improve existing version so the interest to it can be switched from academical to practical. Many thanks to Dmitry Lomov, as he reveals number of very serious flaws.

Let's list all imperfections of current solution:

  1. User continuation was invoked under the lock as well as (possibly) registered cancellation handlers
  2. CancellationTokenSource is not disposed after finishing the computation
  3. Exceptions are not handled at all
  4. All computations are executed in new fresh thread instead of enqueueing job to ThreadPool
  5. Base computation can already have ambient cancellation token and for now this token is ignored.

And now without further ado I'm glad to present you new version:

open System.Threading

type Async with
static member Any(asyncs : seq<Async<'T>>) =
let arr = Seq.toArray asyncs

// signals cancel only once, subsequent calls are ignored
// returns true if cancel signal issued; otherwise - false
let cancel cancelled (cts : CancellationTokenSource) =
if Interlocked.CompareExchange(cancelled, 1, 0) = 0
then cts.Cancel(); true
else false

let createComputation cancelled (cts : CancellationTokenSource) = Async.FromContinuations(fun (cont, econt, ccont) ->

let remaining = ref (int64 arr.Length)
if !remaining = 0L then failwith "Sequence should contain at least one computation"

let cancelled = ref 0

// decrements number of active computations
// disposes cancellation token source when last computation ends
// returns true if Dispose is called; otherwise - false
let decrement () =
if Interlocked.Decrement(remaining) = 0L
then cts.Dispose(); true
else false

let exceptions = new ResizeArray<_>()

let saveSuccess v =
if cancel cancelled cts
then
decrement() |> ignore
cont v
else
decrement() |> ignore

// stores exception
// calls error econt if all subcomputations were completed with errors
let saveError ex =
lock exceptions (fun () -> exceptions.Add(ex))
if decrement() && (exceptions.Count = arr.Length) then
econt(new System.AggregateException(exceptions))

for c in arr do
Async.StartWithContinuations(
async {
do! Async.SwitchToThreadPool()
return! c
},
saveSuccess,
saveError,
(fun _ -> decrement() |> ignore),
cancellationToken = cts.Token
)
)
async {
let cancelled = ref 0
let cts = new CancellationTokenSource()
use! d = Async.OnCancel(fun() -> cancel cancelled cts |> ignore)
let! r = createComputation cancelled cts
return r
}

Notes:

  1. cancel function can be called multiple times. First time leads to triggering cancellation on specified CancellationTokenSource,subsequent calls are ignored. When success continuation (saveSuccess) is invoked for the first time - it signals cancellation and delegates actual continuation.
  2. decrement function counts down when every computation ends (independently from completion type). Finally it disposes associated CancellationTokenSource.
  3. Async.OnCancel propagates externally signalled cancellation to nested computations.

Комментариев нет:

Отправить комментарий

 
GeekySpeaky: Submit Your Site!