воскресенье, 18 апреля 2010 г.

MailboxProcessors: Practical Application

Last time we have examined the API of MailboxProcessors. Now it is time to see, how this concept can be applied to solve real-world problems.

Just as a quick reminder - "ping-pong" sample: two mailboxes, first one sends Ping request to second one, second replies with Pong. After specified number of iteratons execution stops.

type Message = Ping of AsyncReplyChannel<unit> | Stop

let pong (inbox : MailboxProcessor<Message>)=
let rec loop = async {
let! msg = inbox.Receive()
match msg with
| Ping(reply) ->
return! loop
| Stop -> ()
let ping = MailboxProcessor.Start(fun (inbox : MailboxProcessor<int>) ->
let rec start () = async {
printfn "ping::waiting for number of iterations..."
let! n = inbox.Receive()
printfn "ping::%d received, start sending ping" n
do! loop n (MailboxProcessor.Start(pong))
and loop n (pong : MailboxProcessor<Message>) = async {
if n > 0 then
printfn "ping::sending ping, %d remaining" n
do! pong.PostAndAsyncReply(Ping)
printfn "ping::pong received"
do! loop (n - 1) pong
printfn "ping::exiting"
start ()
ping::waiting for number of iterations...
ping::5 received, start sending ping
ping::sending ping, 5 remaining
ping::pong received
ping::sending ping, 4 remaining
ping::pong received
ping::sending ping, 3 remaining
ping::pong received
ping::sending ping, 2 remaining
ping::pong received
ping::sending ping, 1 remaining
ping::pong received

As opposed to trivial Pong function, Ping has a noteworthy moment, it defines a simple state machine with two states. First - obtaining for number of iterations, second - sending ping and waiting for reply.

Ping-pong sample fits nice for demo purposes but suffers from grave shortcoming - it is purely synthetic and useless in real life. It would be very nice to see the sample where MailboxProcessors can uncover their power and capabilities...

Web Sockets

The Web sockets specification - developed as part of the HTML5 initiative - introduced the Web Socket JavaScript interface, which defines a full-duplex single socket connection over which messages can be sent between client and server. The Web Socket standard attempts to simplify much of the complexity around bi-directional web communication and connection management.

The idea behind Web Sockets is very simple - client initiates TCP connection and performs handshake with server. After successful completion of handshake web sockets works pretty similar to usual sockets. Data is sent in the form of UTF-8 text. Frame can contain data where high bit is not set (0x00 to 0x7F) - this kind id data is transferred as a stream of bytes started with 0x00 and terminated with 0xFF. If high bit in data is set then this data should have leading length indicator which is encoded as a series of 7-bit bytes stored in octets with the 8th bit being set for all but the last byte.

For demonstration we will create simple web socket server that accepts requests from browser (Google Chrome already supports web sockets so it will be out test animal) and periodically sent JSON-encoded messages with current time information.

Server components.

  • Request dispatcher accepts incoming connections and spawns worker to handle client requests
  • Worker sends and receives messages from particular client. Worker also incapluates all transport/serialization details so external components just post messages to worker.
  • Controller maintains list of workers and broadcasts messages.
  • Timer periodically send messages to Controller.
open System
open System.IO
open System.Net
open System.Net.Sockets
open System.Text
open System.Threading
open System.Runtime.Serialization

type Time =
{ [<DataMember(Name = "hour")>] mutable Hour : int
[<DataMember(Name = "minute")>] mutable Minute : int
[<DataMember(Name = "second")>] mutable Second : int }
static member New(dt : DateTime) = {Hour = dt.Hour; Minute = dt.Minute; Second = dt.Second}

type Msg =
| Connect of MailboxProcessor<Time>
| Disconnect of MailboxProcessor<Time>
| Tick of Time

let startMailboxProcessor ct f = MailboxProcessor.Start(f, cancellationToken = ct)

let timer (ctrl : MailboxProcessor<Msg>) interval = async {
while true do
do! Async.Sleep interval
ctrl.Post(Tick <| Time.New(DateTime.Now))

let runController (ct : CancellationToken) = startMailboxProcessor ct (fun (inbox : MailboxProcessor<Msg>) ->
let listeners = new ResizeArray<_>()
async {
while not ct.IsCancellationRequested do
let! msg = inbox.Receive()
match msg with
| Connect l -> listeners.Add(l)
| Disconnect l -> listeners.Remove(l) |> ignore
| Tick msg -> listeners.ForEach(fun l -> l.Post msg)

let runWorker (tcp : TcpClient) (ctrl : MailboxProcessor<Msg>) ct = ignore <| startMailboxProcessor ct (fun (inbox : MailboxProcessor<Time>) ->
let rec handshake = async {
let ns = tcp.GetStream()
let reader = new StreamReader(ns)
let lines =
|> Seq.unfold(fun l -> if String.IsNullOrEmpty l then None else Some(l, reader.ReadLine()))
|> Seq.toArray

match lines with
| [| "GET /timer HTTP/1.1"; "Upgrade: WebSocket"; "Connection: Upgrade"; _; _|] ->
// TODO : parse WebSocket-Origin and WebSocket-Location

// send server handshake part
let serverHandshakePart =
"HTTP/1.1 101 Web Socket Protocol Handshake
Upgrade: WebSocket
Connection: Upgrade
WebSocket-Origin: file://
WebSocket-Location: ws://localhost/timer

do! ns.AsyncWrite(serverHandshakePart)
return! run ns
| _ ->
//validation failed - close connection
and run (ns : NetworkStream) = async {
let json = System.Runtime.Serialization.Json.DataContractJsonSerializer(typeof<Time>)
ctrl.Post(Connect inbox)
while not ct.IsCancellationRequested do
let! time = inbox.Receive()

let ms = new MemoryStream()
json.WriteObject(ms, time)

do ns.WriteByte(byte 0x00)
do! ns.AsyncWrite(ms.ToArray())
do ns.WriteByte(byte 0xFF)
ctrl.Post(Disconnect inbox)

let runRequestDispatcher () =
let listener = new TcpListener(IPAddress.Loopback, 80)
let cts = new CancellationTokenSource()
let token = cts.Token

let controller = runController token
Async.Start (timer controller 1000, token)

let main = async {
while not cts.IsCancellationRequested do
let! client = Async.FromBeginEnd(listener.BeginAcceptTcpClient, listener.EndAcceptTcpClient)
runWorker client controller token

Async.Start(main, token)

{ new IDisposable with member x.Dispose() = cts.Cancel()}

let dispose = runRequestDispatcher ()
printfn "press any key to stop..."
Console.ReadKey() |> ignore


ws = new WebSocket("ws://localhost/timer");
ws.onopen = function() { alert("Opened"); };
ws.onclose = function() { alert("Closed"); };
ws.onmessage = function(evt) {
var date = eval('(' + evt.data + ')');
hour.innerText = date.hour;
minute.innerText = date.minute;
second.innerText = date.second;
<style type="text/css">
table, td, th
font-family:"Trebuchet MS", Arial, Helvetica, sans-serif;
border:1px solid #A7C942;
padding:3px 7px 2px 7px;
<td id="hour"></td>
<td id="minute"></td>
<td id="second"></td>

I've opened 3 Chrome windows thus creating 3 workers. As we can notice that timer values in all browsers are changed simultaneously




4 комментария:

  1. Vladimir, I've modified this example so that it works with the revised WebSockets protocol that has a challenge response built into it. I was going to post it, but it doesn't format very well in the preview. Is there some other way I can send it?

  2. For example, you use https://gist.github.com/ and put link to source code in comment.

  3. The revised code is at...


GeekySpeaky: Submit Your Site!