Wednesday, January 4, 2012

Simple Data Pipelines in F#


One of the first tasks that I tackled at my new gig was the redesign of a server side component that simply exported data out of SQL Server 2008,  ran some custom filtering code, and wrote the data to a file in one of three pre-determined formats. One of the nice properties of functional programming that I enjoy the most is modeling programs the way I think about the problem. So for anyone who has done some functional programming before you can probably see that the steps that I mentioned above would/should not be difficult to implement at all using a functional language (and you'd be right). However, it took several days of investigation and dissecting some C#, Xml config files, stored procedures, conversations, etc. just to arrive at the steps above. Rather than bore you with the details of the existing system and how it currently works we'll just say that there were three very important objectives for the replacement to be considered.

1) It must be able to complete the entire process in less time than the current system (10-15min)
2) It must be more flexible than the current system - the current system makes custom sproc calls to pull
back data in a data reader and then line by line determines whether it should keep the row or skip it (filter step) and then format the row and append to a StringBuilder for writing out to a file later.
3) Finally, it needs to be something that can run in a context other than a windows service.

In my mind I'm thinking that F# can handle all those hands down. On the other hand... what is a more flexible approach than the current system which also gives me 1st and 3rd objectives?

So I did what any self respecting developer would do... research. :-)
To my surprise I came across a brilliant strategy by Brett Slatkin and Google App Engine Team  known as app engine-pipelines. This is in my opinion an amazing API that is built to support languages such as Python and Java but in the app engine environment. The best part is it's open source and there is an excellent video detailing what it is, how it's used, etc.  Now that I know I'm going to use this pipeline strategy how am I going to construct this thing? I need a very simple way to build a pipeline of actions/functions... there are a ton of ways I could go here to represent a pipeline as a data structure or collection of data structures but I really want something that is almost trivial yet easy to explain to C# developers who are trying to learn F#. After that I can expand on the trivial design  a bit. So without getting to ahead of myself in terms of design I decided to go with a straight up list. That way each element in the list could represent a function (pipeline step) to execute. All that we need now is a way to recurse through the pipeline steps and execute each one... for that I'll use a simple recursive loop. So as a first attempt I produced the following bit of code.


type PipelineStages = | Fetch | Filter | Format | Archive

let Id = Guid.NewGuid().ToString() //Unique Job Id
(* Mock pipeline -> later we'll construct this from an external source *)
let pipeline = [(Id,Fetch); (Id,Filter); (Id,Format); (Id,Archive)]

let executePipline pipeline =
   let rec execute pipeline accum =
       match pipeline with 
       | [] -> printf "pipeline complete!"
       | h::t ->
            match h with 
            | Id,Fetch ->
                    printf "fetching data from SQL..."
                    let f = construct_fetch()
                    let result = f()
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum
            | Id,Filter ->
                    printf "filtering data..."
                    let f = construct_filter()
                    let result = f(accum)
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum
            | Id,Format ->
                    printf "formatting data..."
                    let f = construct_formatter()
                    let result = f(accum)
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum
                 
            | Id,Archive ->
                    printf "archiving file..."
                    let f = construct_archiver()
                    let result = f(accum)
                    match result with 
                    | Some(c) -> execute t c
                    | None -> execute t accum

   execute pipeline ( new Dictionary<string,object>() )


Again, the code above takes a very simple (almost trivial) approach to building a sequence of steps that can be ran in any order. This definitely helps us in getting closer to the number 2 objective above. It simply breaks down the process into tiny pieces and allows us to do some interesting things at each stage like
recording the run-time of a given pipeline step, capturing the inputs and outputs of a pipeline step, running a
subset of the pipeline (for instance just two stages instead of four), etc. In the next couple of posts I'll show how I've implemented all of these features and how I've managed to get it to run in a windows service and a ASP.NET MVC3 / F# application.


I'm omitting a few details that I'll explain more about next time... for instance:
- construct_fetch()
- construct_filter()
- construct_formatter()
- construct_archiver()
Are all functions that compute functions dynamically from other assemblies using reflection

- accum ->  Is a generic dictionary which passes through results from each pipeline stage much like
ASPNET does with its HttpRequest object. Another way to think about this is a simple fold function that
threads through the generic dictionary as the accumulator.

Until next time...
-Develop with passion


No comments:

Post a Comment