Tuesday, April 10, 2012

F# Agents and SQLCLR

Recently, I was presented with a business problem. Two different MSSQL databases need to be updated and in sync as quickly as possible. This means that when data is inserted into one of the databases table that same inserted data needs to be replicated in the other database table. Oh, and I can't modify the application that's making the database inserts to one of the databases because it's legacy code that uses ado (not ado.net). :-(
Okay, so the obvious solution is to have a service wake up on an scheduled time interval and poll the database table and figure out what data has changed, keep track and update the other database table. That would certainly work but I want to achieve a more reactive result. Not to mention that polling is arguably inefficient and requires a great deal of overhead to keep track of changes. Then I did a little research on reactive approaches. A quick google search  revealed SqlDependency and Change Tracking which are both good approaches but not really what I had in mind. So I did what any functional programmer would do. I went to the black board and started to draw boxes and functional arrows. I quickly realized that all I needed was something (perhaps an Agent or Actor) to listen for changes (or a message) from one table and update the other table with the received message. Hmmmm..... so how do I send a message to an Agent from MSSQL. And then it hit me... SQLCLR. I remember reading about it some time ago back in 2005. I even remember writing a stored procedure in C# just because I could. That was basically the extent of my knowledge on the subject. However, things are different now... it's 2012 and I have the power of F# at my disposal. :-)
I know I'm going to need a trigger so that I can get access to the inserted table which I'll need to parse and send to my Agent. So how do I write an F# SQLCLR trigger? No clue but a quick google search revealed an excellent blog post on how to write an F# SQLCLR stored procedure. All I have to do is tweak the process a bit to create a trigger instead. Low and behold the F# trigger was born.


open System
open System.Data
open System.Data.Sql
open System.Data.SqlClient
open System.Data.SqlTypes
open System.IO
open System.Net
open System.Runtime.Serialization.Formatters.Binary
open Microsoft.SqlServer.Server
open System.Text

type Sample =
    [<SqlTrigger>]
    static member SendIt() =
        let triggContext = SqlContext.TriggerContext;

        let conn = new SqlConnection("context connection =true ");

        conn.Open();

        let sqlComm = conn.CreateCommand();

        sqlComm.CommandText <- "SELECT fieldname, fieldname from inserted";

        let dr = sqlComm.ExecuteReader();
     
        let inserted_data =
            seq {
                    while dr.Read() do
                        yield (dr.[0] :?> string) + "," + (dr.[1] :?> string)
                }
        // Create a request using a URL that can receive a post.
        let request = WebRequest.Create ("http://localhost:50245/")
        // Set the Method property of the request to POST.
        request.Method <- "POST"
        // Create POST data and convert it to a byte array.
        let postData = Seq.head(inserted_data)
        let bf = new BinaryFormatter()
        let ms = new MemoryStream()
        bf.Serialize(ms, postData)
        let byteArray =  ms.ToArray()
        // Set the ContentType property of the WebRequest.
        request.ContentType <- "application/x-www-form-urlencoded"
        // Set the ContentLength property of the WebRequest.
        request.ContentLength <- byteArray.Length |> int64
        // Get the request stream.
        let dataStream = request.GetRequestStream()
        // Write the data to the request stream.
        dataStream.Write (byteArray, 0, byteArray.Length)
        // Close the Stream object.
        dataStream.Close ()
        // Send that puppy!!
        do request.GetResponse() |> ignore


It's worth noting that the trigger code above is not the final code that got used its merely the result
of a 15-20min exploration session in FSI. Definitely one of the reasons why I love F#.
So that's cool... we have this trigger that we can install in MSSQL 2005 or 2008. Oh, and I should
warn you if you're interested in doing this sort of thing. You must target your F# code 
to .NET 2.0. This is imperative due to the fact that MSSQL CLR stuff only understands .NET 2.0.

Okay, so if you're still with me then you're probably wondering what is behind the address in the
web request above. http://localhost:50245 -> is an available port on the same machine where one of the databases is hosted. Behind that address is a HttpListener and behind that is an F# Agent. Some time ago Tomas wrote a really cool HttpAgent and demo'd it over at skills matter.

You can head on over to his blog to grab the code. The code below gives you an idea of how you might implement a listener to accept the trigger request.


open System
open System.Net
open System.Net.NetworkInformation
open System.Threading

let generate_next_available_tcpPort =
    let properties = IPGlobalProperties.GetIPGlobalProperties()
    let active_tcp_connections = properties.GetActiveTcpConnections()
    Seq.map(fun (x: TcpConnectionInformation) -> x.LocalEndPoint.Address, x.LocalEndPoint.Port)       active_tcp_connections
    |> Seq.sortBy(fun x -> snd(x))
    |> Seq.map(fun x -> snd(x))
    |> fun x ->
           let index = Seq.length(x) - 1
           Seq.nth index x
    |> fun x -> x + 1



let url = sprintf "http://localhost:%i/" generate_next_available_tcpPort

let convert_stream_toBytes (stream: Stream) =
    let memoryStream = new MemoryStream();
    stream.CopyTo(memoryStream);
    memoryStream.ToArray()

(* Computation for sending email *)
let sendMail (from: string) (To: string) (subject: string) (body: string) =
  let client = new SmtpClient(Host = "localhost", DeliveryMethod = SmtpDeliveryMethod.Network, Port = 25)
        let mm = new MailMessage()
        mm.From <- new MailAddress(from)
        mm.To.Add(To)
        mm.Subject <- subject
        mm.Body <- body
        client.SendAsync(mm,null)


(* Http Listener *)
let server = HttpAgent.Start(url, fun server -> async {
    while true do
        let! ctx = server.Receive()
        ctx.Response.Reply("Message Forwarded")
        let stream = ctx.Request.InputStream
        let memStream = new MemoryStream()
        let binForm = new BinaryFormatter()
        let arrBytes = convert_stream_toBytes stream
        memStream.Write(arrBytes, 0, arrBytes.Length)
        memStream.Seek((0 |> int64), SeekOrigin.Begin) |> ignore
        let obj: string = binForm.Deserialize(memStream) |> fun x -> x :?> string
        sendMail "noreply@F#Agent.com" "destination@destination.com" "BackEnd SQL Message" obj})
           
// Stop the HTTP server and release the port
server.Stop()



Very cool! We're able to receive the request from a SQLCLR Trigger.
Note: You'll notice on the second line of the agent loop that I'm immediately returning the response back to MSSQL so that the trigger can complete thus allowing the data to actually be inserted. The time it takes the MSSQL trigger to hit the port and return is about one second. which is more than acceptable in my scenario. However, you may want to be careful that you don't do all agent work first and then return the response because the MSSQL insert statement could time out.
Once I received the raw data as a string I simply emailed it to myself just to show that I could do whatever
I wanted to do with the data once it was inside my Agent. So far the approach has turned out to be a really
nice alternative to polling. And the future possibilities  are endless now that we're able to get the data as it's being inserted, updated, or deleted. :-)

Until next time...








Tuesday, March 20, 2012

Fa Sho!

What does Fa Sho! mean?  Well if you consult google or the Urban Dictionary you might find that it means
"For Sure!" In this case it means to use Sho with F#. After watching the cool demo some time ago I started playing around with Sho from time to time. I've come to realize that Sho does an awful lot of useful and interesting things. It's almost like FSChart on steroids. Recently I bought Visualize This and decided to give some of the samples a go using F# and Sho, err Fa Sho! :-)

But before I could get to the samples I had to get my feet wet with the library. Fortunately, F# is amazing at letting me do that so I ported the C# sample code to a much shorter version illustrated below.



#I @"C:\Program Files (x86)\Sho 2.0 for .NET 4\bin"
#r "ShoArray.dll"
#r "ShoViz.dll"
#r "MathFunc.dll"
#r "MatrixInterf.dll"


open ShoNS.Array
open ShoNS.MathFunc
open ShoNS.Visualization


let x = ArrayRandom.RandomDoubleArray(10,10) 
in x * x.T


let figure = ShoPlotHelper.Figure() in figure.Bar(x)

Which produces the following output.
















If you were to look at the sample C# code provided in the download and compared it with this
then you'd definitely be able to see why F# is supreme (IMO). :-)

Until next time...




Simple Data Pipelines in F# Part 2

My apologies on the extremely long delay in posts. So many prototypes have emerged since my last post.
It's one of the reasons why I love my job. I have the freedom to work with and prove all different kinds of technologies like Mono, Erlang, Ocaml, and my latest favorite Haskell. Thanks to these lecture videos and this really great book I scored for $8.00 at a second hand book store - I'm actually starting to understand Haskell! But enough about me... let's get back to this simple data pipeline.

If you recall in the last post we laid out the pipeline steps into distinct function calls or applications.
Fetch -> Filter -> Format -> Archive
Having the pipeline steps laid out like this gives a nice way to plugin new logic at any time which leads to a number of good things especially a nice way to maintain the app and introduce new functionality.
Which brings me to how I initially implemented this functionality. In order to have the flexibility I need a way to dynamically load some logic that could be executed and handled in the case of exceptions. Late binding comes to mind... so basically I'll  need to use reflection to load an assembly and execute some code. I'll do this by introducing an abstraction for each pipeline step starting with Fetch.


type IFetchData = 
    abstract FetchData: PipelineContext -> System.Collections.IEnumerable


type IApplyFilters = 
    abstract ApplyFilters : PipelineContext -> System.Collections.IEnumerable ->                System.Collections.IEnumerable


type IFormatData = 
    abstract FormatData : PipelineContext -> System.Collections.IEnumerable -> unit


type IArchiveData  = 
    abstract ArchiveData : PipelineContext -> unit

Remember the PipelineContext is just a F# record which holds a Dictionary<T,V>. It represents a key (for the pipeline step name) and a value which represents the value of that specific pipeline step computation. That pipeline context gets passed through the pipeline so that each step has access to what's been done in the pipeline thus far. So let's look at how we might implement the fetch pipeline step.

In F# we'd simply express something like the following:


type SampleFetchData() =

    let map (reader: IDataReader) (fieldname: string): string =
            let index = reader.GetOrdinal(fieldname)
            if not <| reader.IsDBNull(index) then
                reader.[fieldname].ToString()
            else
                "0" //means null in the config              
 
    interface IFetchData with
     member self.FetchData ctx =
        let start_end =
           match ctx.config.ContainsKey("StartDate") && ctx.config.ContainsKey("EndDate") with
                | true ->  ctx.config.Item "StartDate" :?> String , ctx.config.Item "EndDate" :?> String
                | false -> "04/01/2011", "12/01/2011 11:59:59"
               
        let fetch_query =  @"SELECT t.Name, t.ActiveDate
                                     FROM  Table  t
                                     WHERE t.ActiveDate BETWEEN '{0}' AND '{1}'",
                                       fst(start_end), snd(start_end)

     seq {
               use connection = new SqlConnection("Data Source={0}",ctx.config.Item"connString")
                 connection.Open()
                 use command = new SqlCommand(fetch_query, connection) 
                 use reader = command.ExecuteReader()
                    while reader.Read() do
                        yield {Name= map reader "Name";
                                  ActiveDate = map reader "ActiveDate"; }


              } :> System.Collections.IEnumerable



 Okay, that's kind of cool! It allows us to basically fetch data anyway we want so long as we return it as an   IEnumerable. The example above happens to hit SQL but there's nothing stopping us from hitting MongoDb or RavenDb. Now that we've implemented the code for Fetch we just need to build it as a .dll and the pipeline infrastructure will pick it up and execute it!

We're almost there we just need to look at the code that is behind construct_fetch() from the first post.


module PipelineBuilder =
 
     //Reflection Helpers
     let getinstance (instance: Type) = Activator.CreateInstance(instance)
     let get_referenced_assm (asmName: string) =
        AppDomain.CurrentDomain.GetAssemblies()
        |> Seq.filter(fun (asm: Assembly) -> asm.GetName().Name = asmName )
        |> Seq.head

     let fetch_types (contract: string) (defAsm: Assembly) =
        let matches = seq { for t in defAsm.GetTypes() do
                                            if t.GetInterface(contract) <> null then
                                                yield (t,t.GetInterface(contract))  }
     
        if Seq.length(matches) > 0 then
            let concreteType, contract = Seq.head(matches)
            Some(concreteType,contract)
        else
            None

     (* Fetch Plugin *)
     let construct_fetch(): (PipelineContext -> PipelineResult option)  =
         fun (ctx: PipelineContext) ->
            let pipeline_step = fetch_types "IFetchData" (get_referenced_assm "SampleFetchData")
              match pipeline_step with
              | Some(t,_) ->
                     Some( ( 
                              try
                                  let instance = getinstance t :?> IFetchData
                                  let result = instance.FetchData ctx
                                  ctx.accum.Add("FetchData",result)
                                  PipelineStepSuccess(ctx)
                              with ex -> 
                                  let msg = PipelineStepException(ex.Message) :?> PipelineStepException
                                  PipelineStepFailure(msg) ) )


              | None -> None 




  I've highlighted the important part and omitted the other pipeline steps since there implementation is very similar to construct_fetch() above. Allow me to frame out what's going here. construct_fetch() takes no arguments and returns a function which accepts one argument (PipelineContext) and returns a result which is PipelineStepSuccess or PipelineStepFailure as a PipelineResult which is expressed for clarity below.

exception PipelineStepException of string
type PipelineResult = | PipelineStepSuccess of PipelineContext  | PipelineStepFailure of PipelineStepException | EmptyPipelineResult  
   

That's essentially all there is to it. Plugins like Fetch can be written in F# or C# which makes the
team members happy to continue C# as I ease them into F#.
I think the last post in this series will be a link to all the code. :-)

Until next time...

Wednesday, January 4, 2012

Simple Data Pipelines in F#


One of the first tasks that I tackled at my new gig was the redesign of a server side component that simply exported data out of SQL Server 2008,  ran some custom filtering code, and wrote the data to a file in one of three pre-determined formats. One of the nice properties of functional programming that I enjoy the most is modeling programs the way I think about the problem. So for anyone who has done some functional programming before you can probably see that the steps that I mentioned above would/should not be difficult to implement at all using a functional language (and you'd be right). However, it took several days of investigation and dissecting some C#, Xml config files, stored procedures, conversations, etc. just to arrive at the steps above. Rather than bore you with the details of the existing system and how it currently works we'll just say that there were three very important objectives for the replacement to be considered.

1) It must be able to complete the entire process in less time than the current system (10-15min)
2) It must be more flexible than the current system - the current system makes custom sproc calls to pull
back data in a data reader and then line by line determines whether it should keep the row or skip it (filter step) and then format the row and append to a StringBuilder for writing out to a file later.
3) Finally, it needs to be something that can run in a context other than a windows service.

In my mind I'm thinking that F# can handle all those hands down. On the other hand... what is a more flexible approach than the current system which also gives me 1st and 3rd objectives?

So I did what any self respecting developer would do... research. :-)
To my surprise I came across a brilliant strategy by Brett Slatkin and Google App Engine Team  known as app engine-pipelines. This is in my opinion an amazing API that is built to support languages such as Python and Java but in the app engine environment. The best part is it's open source and there is an excellent video detailing what it is, how it's used, etc.  Now that I know I'm going to use this pipeline strategy how am I going to construct this thing? I need a very simple way to build a pipeline of actions/functions... there are a ton of ways I could go here to represent a pipeline as a data structure or collection of data structures but I really want something that is almost trivial yet easy to explain to C# developers who are trying to learn F#. After that I can expand on the trivial design  a bit. So without getting to ahead of myself in terms of design I decided to go with a straight up list. That way each element in the list could represent a function (pipeline step) to execute. All that we need now is a way to recurse through the pipeline steps and execute each one... for that I'll use a simple recursive loop. So as a first attempt I produced the following bit of code.


type PipelineStages = | Fetch | Filter | Format | Archive

let Id = Guid.NewGuid().ToString() //Unique Job Id
(* Mock pipeline -> later we'll construct this from an external source *)
let pipeline = [(Id,Fetch); (Id,Filter); (Id,Format); (Id,Archive)]

let executePipline pipeline =
   let rec execute pipeline accum =
       match pipeline with 
       | [] -> printf "pipeline complete!"
       | h::t ->
            match h with 
            | Id,Fetch ->
                    printf "fetching data from SQL..."
                    let f = construct_fetch()
                    let result = f()
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum
            | Id,Filter ->
                    printf "filtering data..."
                    let f = construct_filter()
                    let result = f(accum)
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum
            | Id,Format ->
                    printf "formatting data..."
                    let f = construct_formatter()
                    let result = f(accum)
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum
                 
            | Id,Archive ->
                    printf "archiving file..."
                    let f = construct_archiver()
                    let result = f(accum)
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum

   execute pipeline ( new Dictionary<string,object>() )


Again, the code above takes a very simple (almost trivial) approach to building a sequence of steps that can be ran in any order. This definitely helps us in getting closer to the number 2 objective above. It simply breaks down the process into tiny pieces and allows us to do some interesting things at each stage like
recording the run-time of a given pipeline step, capturing the inputs and outputs of a pipeline step, running a
subset of the pipeline (for instance just two stages instead of four), etc. In the next couple of posts I'll show how I've implemented all of these features and how I've managed to get it to run in a windows service and a ASP.NET MVC3 / F# application.


I'm omitting a few details that I'll explain more about next time... for instance:
- construct_fetch()
- construct_filter()
- construct_formatter()
- construct_archiver()
Are all functions that compute functions dynamically from other assemblies using reflection

- accum ->  Is a generic dictionary which passes through results from each pipeline stage much like
ASPNET does with its HttpRequest object. Another way to think about this is a simple fold function that
threads through the generic dictionary as the accumulator.

Until next time...
-Develop with passion


Introducing the last expression

As you can probably tell from the title of this blog my posts here will only be about things related to Functional Programming. Since I was introduced to the concept three and a half years ago it has completely changed my life as programmer. So much in fact that I have left a completely comfortable job in OOP (C#) building bleeding edge WPF/Silverlight software for this new found way of developing software.
That's right, I left the client side and I'm now developing for the server side. Most of my work these days involve functional / computational thinking for designing server side components that are blazing fast.
I'm also still developing some client side stuff using ASP.NET MVC and a couple of really great frameworks to go along with it. One of them being my favorite right now which is twitter bootstrap. It's really great at helping me reach a clean and modern look for the F# web apps that I'm building. More to come on that.
Overall you can expect to see all kinds of functional programming code that I play around with or use in my daily work to solve real world problems. So expect to see lots of F# (of course) some Erlang, Ocaml,
Python, and I finally downloaded GHC so expect to see some really basic Haskell. These are all great languages to help me foster my ideas into something tangible that I can turn around and use in a real world system.

In case you were wondering about the name of this blog and where it came from... its something that has
always made me feel warm and fuzzy inside ever since I first realized that the last expression in a function is automatically returned in most functional languages including the one I was using to learn FP. I was over joyed to discover this little nugget for the first time and... I really hated writing return in C#. :-)

Until next time....
-Develop with passion