Yet Another cGA Implementation, Now in Haskell.

A year ago, I mentioned that I always write a cGA implementation when I learn a new language. Then, I was trying to get back to fluent in Haskell. A couple of days ago, Martin Pelikan just did the same

A year ago, I mentioned that I always write a cGA implementation when I learn a new language. Then, I was trying to get back to fluent in Haskell. A couple of days ago, Martin Pelikan just did the same and wanted to compare implementations. So, what did I do? I looked for my implementation to post it here.

I took a look at the code and change a couple of things, but I can say that the Haskell implementation is the shortest working implementation that I have ever written in any language. It is shorter than the versions I wrote in Scala and Erlang. Python could get awkwardly compressed using some functional flavor to get close to this, but dynamic typing… C, C++, Java, Go and other friends, are far away when you look in the rear Haskell mirror. Anyway, the code below implements cGA for binary strings. You chose the population size, the number of bits, and the evaluation function. Also, some of the constructs are simple and elegant that do not need much explanation (not to mention maintainability…)

import Data.List.Split
import System.Random

diffBinaryIndividuals popSize ind1 ind2 =
	map (\ (x, y) -> if x == y then 0 else (2 * x - 1) / popSize) $ zip ind1 ind2

updateBinaryModel f popSize model ind1 ind2 = 
	zipWith (+) model update
	where f1 = f ind1
	      f2 = f ind2
	      update = if f1 > f2 
	      	then diffBinaryIndividuals popSize ind1 ind2 
	      	else diffBinaryIndividuals popSize ind2 ind1 

sampleTwoBinaryIndividuals model gen =
	chunksOf l $ zipWith (\ m r -> if r < m then 1 else 0) (model ++ model) rnds
	where rnds = take (2 * l) (randoms gen :: [Float])
	      l = length model

cgaStepForBinaryIndividuals f model popSize gen =
	updateBinaryModel f popSize model ind1 ind2
	where ind1 : ind2 : [] = sampleTwoBinaryIndividuals model gen

hasModelConverged model = all (\x -> x > 0.9 || x < 0.1) model

cga _ _ model | hasModelConverged model = return model
cga f popSize model = do
	gen <- newStdGen
	res <- (cga f popSize (cgaStepForBinaryIndividuals f model popSize gen))
	return res

And you can see it in action below solving 5-bit and 50-bit OneMax problems.

> cga (sum) 1000 (take 5 $ repeat 0.5)
[0.90099484,0.9029948,0.9029948,0.9019948,0.9209946]

> cga (sum) 1000 (take 50 $ repeat 0.5)
[0.9209946,0.9279945,0.96899396,0.96899396,0.95399415,0.9259945,0.9419943,0.96299404,0.9589941,0.9419943,0.93799436,0.9519942,0.9109947,0.94599426,0.95399415,0.9449943,0.94799423,0.964994,0.9199946,0.93199444,0.9429943,0.9569941,0.95499414,0.96999395,0.9369944,0.9579941,0.96199405,0.9429943,0.96099406,0.9359944,0.967994,0.9209946,0.9449943,0.966994,0.9329944,0.95499414,0.96999395,0.9449943,0.90799475,0.9579941,0.95299417,0.93999434,0.94699425,0.9179946,0.9559941,0.90099484,0.9359944,0.9339944,0.9339944,0.9359944]

cGA, Parallelism, Processes, and Erlang

Back in Fall 2006 I was lucky to be at the right place, at the right time. Kumara Sastry and David E. Goldberg were working to pulverize some preconceptions about how far you could scale genetic algorithms. As I said,

Back in Fall 2006 I was lucky to be at the right place, at the right time. Kumara Sastry and David E. Goldberg were working to pulverize some preconceptions about how far you could scale genetic algorithms. As I said, I was lucky I could help the best I could. It turned out that the answer was pretty simple, as far as you want. The key to that result was, again, built on Georges Harik’s compact genetic algorithm. The results were published on a paper titled Toward routine billion-variable optimization using genetic algorithms if you are curious.

Anyway, back on track. A few days ago, I was playing with Erlang and I coded, just for fun, yet another cGA implementation, now in Erlang. The code was pretty straight forward, so why not take another crack at it and write an Erlang version that uses some of the ideas we used on that paper.

The idea we used on the paper was simple. Slice the probabilistic model into smaller segments and update all those model fragments in parallel. The only caveat, if you go over the cGA model, is that you need the evaluation of two individuals to decide which way to update the model. Also, you need to know when to stop, or when your global model has converged. The flow is pretty simple:

  1. Sample in parallel two individuals.
  2. Compute the partial evaluation (in the example below the beloved OneMax).
  3. Emit the partial evaluations.
  4. Collect the partial evaluation, and compute the final fitness.
  5. Rebroadcast the final evaluation to all model fragments.
  6. With the final evaluations at hand, just update the model fragments.
  7. Compute if the local fragment of the model has converged and emit the outcome.
  8. With all the partial convergence checks, decide if the global model has globally converged.
  9. If the global model has not converged, continue to (1).

The implementation below is quite rough. It could be cleaned up using functional interfaces to hide all the message passing between processes, but you get the picture. Also, if you look at the implementation below, you may find that the way global fitness and convergence are computed have only one process serializing each those request. You may remember Amdhal’s law, not a big problem with a few thousand model fragments, but as you scale up you are going to eventually have to worry about. For instance, you could improve it, for instance, by using a broadcast tree. Anyway, let’s put all those a side for now, and do a simple implementation to get the ball rolling.

-module(pcga).
-export([one_max/1, cga/6, accumulator/4, has_converged/3, cga_loop/8, time/4]).
 
 % Accumulates the partial evaluations.
accumulator(Pids, Values1, Values2, Groups) when length(Pids) == Groups ->
  Acc1 = lists:sum(Values1),
  Acc2 = lists:sum(Values2),
  lists:map(fun(P) -> P ! {final_eval, Acc1, Acc2} end, Pids),
  accumulator([], [], [], Groups);
accumulator(Pids, Values1, Values2, Groups) when length(Pids) < Groups ->
  receive
    {eval, Pid,	Value1, Value2} ->
        accumulator([Pid | Pids], [Value1 | Values1], [Value2 | Values2], Groups);
    stop -> ok
  end.

% Convergence checker.
has_converged(Pids, Votes, Groups) when length(Pids) == Groups ->
  FinalVote = lists:sum(Votes),
  lists:map(fun(P) -> P ! {final_converged, FinalVote == Groups} end, Pids),
  has_converged([], [], Groups);
has_converged(Pids, Votes, Groups) when length(Pids) < Groups ->
  receive
    {converged, Pid, Vote} ->
      has_converged([Pid | Pids], [Vote | Votes], Groups);
    stop -> ok
  end.

% OneMax function.
one_max(String) -> lists:sum(String).
 
% Generates random strings of length N given a Model.
random_string(Model) ->
  lists:map(fun (P) -> case random:uniform() < P of true -> 1; _ -> 0 end end,
            Model).
 
% Generates a random population of size Size and strings of length N.
initial_model(N) -> repeat(N, 0.5, []).
 
% Given a pair of evaluated strings, returns the update values.
update({_, Fit}, {_, Fit}, N, _) ->
  repeat(N, 0, []);
update({Str1, Fit1}, {Str2, Fit2}, _, Size) ->
  lists:map(fun ({Gene, Gene}) -> 0;
                ({Gene1, _}) when Fit1 > Fit2 -> ((Gene1 * 2) - 1) / Size;
                ({_, Gene2}) when Fit1 < Fit2 -> ((Gene2 * 2) - 1) / Size
            end,
            lists:zip(Str1, Str2)).

% Check if the model has converged.
converged(Model, Tolerance) ->
  lists:all(fun (P) -> (P < Tolerance) or (P > 1 - Tolerance) end, Model).

% The main cGA loop.
cga(N, GroupSize, Groups, Fun, Tolerance, Print) 
  when N > 0, GroupSize > 0, Groups > 0, Tolerance > 0, Tolerance < 0.5 ->
  Acc = spawn(pcga, accumulator, [[], [], [], Groups]),
  Con = spawn(pcga, has_converged, [[], [], Groups]),
  lists:foreach(
    fun(_) ->
      spawn(pcga, cga_loop, 
            [N, GroupSize, Fun, initial_model(GroupSize), Tolerance, Acc, Con, Print])
    end,
    repeat(Groups, 1, [])).
 
cga_loop(N, Size, Fitness, Model, Tolerance, Acc, Con, Print) ->
  [{Str1, P1}, {Str2, P2} | _] = lists:map(
    fun (_) -> Str = random_string(Model), {Str, Fitness(Str)} end,
    [1,2]),
  Acc ! {eval, self(), P1, P2},
  receive
    {final_eval, FF1, FF2} ->
      NewModel = lists:map(fun ({M, U}) -> M + U end,
      lists:zip(Model, update({Str1, FF1}, {Str2, FF2}, Size, Size))),
      case converged(NewModel, Tolerance) of
        true -> Con ! {converged, self(), 1};
        false ->  Con ! {converged, self(), 0}
      end,
      receive
        {final_converged, true} -> 
          case Print of 
            true -> io:fwrite("~p\n", [NewModel]);
            _ -> true
          end,
          Acc ! Con ! stop;
        {final_converged, false} -> 
          cga_loop(N, Size, Fitness, NewModel, Tolerance, Acc, Con, Print)
      end
  end.

The code above allows you to decide how many model fragments (Groups) you are going to create. Each fragment is assigned to a process. Each fragment has GroupSize variable of the model and N is the population size. A simple example on how to run the code:

c(pcga).
pcga:cga(50000, 500, 10, fun pcga:one_max/1, 0.01, true).

The model will contain 50,000 variables split into 10 process each of each containing a fragment of 50 variables. I guess now the only thing left is measure how this scales.

Yet Another cGA Implementation, Now in Erlang.

Wanna have some Sunday afternoon fun? Just refresh your Erlang skills. Since this is me having fun, what better way to do so than to write yet another implementation of the compact Genetic Algorithm originally (cGA) proposed by Georges Harik?

Wanna have some Sunday afternoon fun? Just refresh your Erlang skills. Since this is me having fun, what better way to do so than to write yet another implementation of the compact Genetic Algorithm originally (cGA) proposed by Georges Harik?

I am going to skip describing the original algorithm and focus a bit on how to implement it in Erlang instead. You can find some nice books elsewhere and more information on the Erlang site. Erlang is an interesting mix of functional and logic programming languages. If you ever wrote code in ProLog, Erlang is going to look familiar. It will also look familiar if you are coming from Haskell, although, being Erlang a dynamically typed language, you will miss the type system and inference. Nevertheless, give it a chance. It concurrent model is worthwhile reading about. I will it for further posts thought.

Anyway, without further preamble, let’s dive into a naïve implementation of cGA in Erlang. Lists are an integral part of Erlang, hence it seems obvious that individuals could be represented by a list of integers. Under this representation, OneMax is trivial to implement by summing all the elements of the list defining an individual. Following this train of thought, the probabilistic model could also be represented by a simple list of floats (each entry representing the probability of 1 for a given locus).

Given the above description, a cGA implementation would just require: (1) an individual constructor based on sampling the current probabilistic model, (2) a function that given two evaluated individuals compute the model update, and (3) a function to check if the probabilistic model has converged. Once these basic functions are available, writing a cGA boils down to sampling two individuals, compute the updates required based on the evaluated individuals, and update the probabilistic model. This process should be repeated until the model has converged. The Erlang code below shows a possible implementation of such an approach.

% Naive implementation of the compact Genetic Algorithm in Erlang.
-module(cga).
-export([one_max/1, cga/4]).

% OneMax function.
one_max(String) -> lists:sum(String).

% Generates random strings of length N given a Model.
random_string(Model) ->
  lists:map(fun (P) -> case random:uniform() < P of true -> 1; _ -> 0 end end,
            Model).

% Generates a random population of size Size and strings of length N.
initial_model(N) -> repeat(N, 0.5, []).

% Given a pair of evaluated strings, returns the update values.
update({_, Fit}, {_, Fit}, N, _) ->
  repeat(N, 0, []);
update({Str1, Fit1}, {Str2, Fit2}, _, Size) ->
  lists:map(fun ({Gene, Gene}) -> 0;
                ({Gene1, _}) when Fit1 > Fit2 -> ((Gene1 * 2) - 1) / Size;
                ({_, Gene2}) when Fit1 < Fit2 -> ((Gene2 * 2) - 1) / Size
            end,
            lists:zip(Str1, Str2)).

% Check if the model has converged.
converged(Model, Tolerance) ->
  lists:all(fun (P) -> (P < Tolerance) or (P > 1 - Tolerance) end, Model).

% The main cGA loop.
cga(N, Size, Fun, Tolerance) when N > 0, Size > 0, Tolerance > 0, Tolerance < 0.5 ->
  cga_loop(N, Size, Fun, initial_model(N), Tolerance).

cga_loop(N, Size, Fitness, Model, Tolerance) ->
  case converged(Model, Tolerance) of
    true ->
      Model;
    false ->
      [P1, P2 | _] = lists:map(
        fun (_) -> Str = random_string(Model), {Str, Fitness(Str)} end,
        [1,2]),
      cga_loop(N, Size, Fitness,
               lists:map(fun ({M, U}) -> M + U end,
                         lists:zip(Model, update(P1, P2, N, Size))),
               Tolerance)
  end.

% Creates a list of Size repeating Value.
repeat(0, _, Update) -> Update;
repeat(N, Value, Update) -> repeat(N - 1, Value, [Value | Update]).

You can run this code by pasting it into a file named cga.erl. Use the Erlang shell to compile and run cGA as shown below (once you start the Erlang shell via $erl).

1> c(cga).
{ok, cga.}
2> cga:cga(3, 30, fun cga:one_max/1, 0.01).
[0.999, 0.989, 0.098]

A couple of interesting considerations. Compiling and loading code in Erlang support hot code replacement without stopping a running production system. Obviously this property it is not critical for the cGA exercise, but it is an interesting property nonetheless. Another one is that functions, due to its functional programming ancestry, are first class citizens you can pass around. That means that the current implementation done supports you passing arbitrary fitness functions without having to change anything on the cGA implementation.

Finally, I mentioned that this is a naïve implementation to refresh my rusty Erlang syntax. You may want to spent some time profiling this implementation to see how to improve it. Also, you may want to start thinking on how we could take advantage of the concurrency model in Erlang to build a not-so-naive implementation of cGA.

Revamping My Blog

I have been away from my blog for quite a long time. I have barely posted anything compelling in the last three years. Most of the updates were the sporadic announcements to ACM SigEvo’s GECCO conference, but event that was

I have been away from my blog for quite a long time. I have barely posted anything compelling in the last three years. Most of the updates were the sporadic announcements to ACM SigEvo’s GECCO conference, but event that was spotty at best. Yes, like everybody else, I gravitated toward social media (pick your favorite poison here).

I spend quite a bit of time thinking what I wanted to use my blog for. Should it be the same kind of blog? Should I change it under the hood? Should I give it a golden retirement since it seems I have no stories to share anymore? Then in the mist of all this unanswered questions, I realized I wanted my blog to be what it has been all along. It is whatever I need it to be. Yes, some thoughts are faster to share on ephemeral social media outlets, but there are things you want to keep around longer. Hence, I decided to start a face lift as part of this renewed path. Talking about look and feel, I kept it pretty similar as you may have realized. No big changes, mostly layout updates, removing as much clutter as possible, a bit of font sprinkling here and there, but eventually trying to keep it pretty much the same. I guess that I like the cozy feeling of it feeling familiar.

However, one thing I decided to change, after people I care deeply kept insisting that I should, was to build a more permanent home, as I mentioned earlier, for those moments you want to keep around long after the social media rapid timing has digested them into oblivion. Curating photos into gift wrapped packages you find while window shopping was one of those itches that help drove change. As a result of it, you may now see a ‘Photo Stream‘ top main menu entry. It is a running stream of some of the photos I share on my G+ profile. Under this running photo stream, you will find soon some of those gift-wrapped packages containing some of the photos I cannot shake away. Today, I am adding one. Make sure you check it out.

Will this revamping of the blog make me post more often? That is another story.

A Little Functionality and Face Lift

It has been a while since the last face lift to this blog. No, I was not planning any major revamp, but a simple one. Since it was released, I had the little green ShareThis button hanging around. I just

No related posts.

It has been a while since the last face lift to this blog. No, I was not planning any major revamp, but a simple one. Since it was released, I had the little green ShareThis button hanging around. I just wanted to balance a bit the elements on the page. I decided to reposition the button on the top right of the excerpts and on the post themselves. While doing the repositioning I decided to simplify a bit its functionality and replace the button with something a bit lighter and still with purpose. After giving it a bit of a thought I replaced it with Google’s +1 button for publishers. It looks a little best balanced and it does not clutter the layout. You can find more information on how to add the +1 button to your site can be found at the +1 button page for webmasters.

Minor Update

If you used to check my Twitter stream in the center column at the bottom of my blog, you would have noticed  I have just replaced it with my Google Buzz profile stream instead. Yes, you can still see the my Twitter

If you used to check my Twitter stream in the center column at the bottom of my blog, you would have noticed  I have just replaced it with my Google Buzz profile stream instead. Yes, you can still see the my Twitter activity, but you will also be able to (1) see aggregated content/activity coming from other sources in one place, (2) subscribe to the stream, and (3) comment on each of the entries. As, I said, a minor update to improve its functionality a bit more.

Soaring the Clouds with Meandre

You may find the slide deck and the abstract for the presentation we delivered today at the “Data-Intensive Research: how should we improve our ability to use data” workshop in Edinburgh. Abstract This talk will focus a highly scalable data intensive infrastructure being developed at the National Center for Supercomputing Application (NCSA) at the University […]

Related posts:

  1. Meandre: Semantic-Driven Data-Intensive Flows in the Clouds
  2. Data-Intensive Computing for Competent Genetic Algorithms: A Pilot Study using Meandre
  3. [BDCSG2008] Clouds and ManyCores: The Revolution (Dan Reed)

You may find the slide deck and the abstract for the presentation we delivered today at the “Data-Intensive Research: how should we improve our ability to use data” workshop in Edinburgh.

Abstract

This talk will focus a highly scalable data intensive infrastructure being developed at the National Center for Supercomputing Application (NCSA) at the University of Illinois and will introduce current research efforts to tackle the challenges presented by big-data. Research efforts include exploring potential ways of integration between cloud computing concepts—such as Hadoop or Meandre—and traditional HPC technologies and assets. These architecture models contrast significantly, but can be leveraged by building cloud conduits that connect these resources to provide even greater flexibility and scalability on demand. Orchestrating the physical computational environment requires innovative and sophisticated software infrastructure that can transparently take advantage of the functional features and to negotiate the constraints imposed by this diversity of computational resources. Research conducted during the development of the Meandre infrastructure has lead to the production of an agile conductor able to leverage the particular advantages in the physical diversity. It can also be implemented as services and/or in the context of another application benefitting from it reusability, flexibility, and high-scalability. Some example applications and an introduction to the data intensive infrastructure architecture will be presented to provide an overview of the diverse scope of Meandre usages. Finally, a case will be presented showing how software developers and system designers can easily transition to these new paradigms to address the primary data-deluge challenges and to soar to new heights with extreme application scalability using cloud computing concepts.

Related posts:

  1. Meandre: Semantic-Driven Data-Intensive Flows in the Clouds
  2. Data-Intensive Computing for Competent Genetic Algorithms: A Pilot Study using Meandre
  3. [BDCSG2008] Clouds and ManyCores: The Revolution (Dan Reed)

Meandre is going Scala

After quite a bit of experimenting with different alternatives, Meandre is moving into Scala. Scala is a general purpose programming language designed to express common programming patterns in a concise, elegant, and type-safe way. This is not a radical process, but a gradual one while I am starting to revisit the infrastructure for the next […]

Related posts:

  1. Fast REST API prototyping with Crochet and Scala
  2. Meandre: Semantic-Driven Data-Intensive Flow Engine
  3. Meandre Infrastructure 1.4 RC1 tagged

After quite a bit of experimenting with different alternatives, Meandre is moving into Scala. Scala is a general purpose programming language designed to express common programming patterns in a concise, elegant, and type-safe way. This is not a radical process, but a gradual one while I am starting to revisit the infrastructure for the next major release. Scala also generates code for the JVM making mix and match trivial. I started fuzzing around with Scala back when I started the development of Meandre during the summer of 2007, however I did fall back to Java since that was what most of the people in the group was comfortable with. I was fascinated with Scala fusion of object oriented programming and functional programming. Time went by and the codebase has grown to a point that I cannot stand anymore cutting through the weeds of Java when I have to extend the infrastructure or do bug fixing—not to mention its verbosity even for writing trivial code.

This summer I decided to go on a quest to get me out of the woods. I do not mind relying on the JVM and the large collection of libraries available, but I would also like to get my sanity back. Yes, I tested some of the usual suspects for the JVM (Jython, JRuby, Clojure, and Groovy) but not quite what I wanted. For instance, I wrote most of the Meandre infrastructure services using Jython (much more concise than Java), but still not quite happy to jump on that boat. Clojure is also interesting (functional programming) but it would be hard to justify for the group to move into it since not everybody may feel comfortable with a pure functional language. I also toyed with some not-so-usual ones like Erlang and Haskell, but again, I ended up with no real argument that could justify such a decision.

So, as I started doing back in 2007, I went back to my original idea of using Scala and its mixed object-oriented- and functional-programming- paradigm. To test it seriously, I started developing the distributed execution engine for Meandre in Scala using its Earlang-inspired actors. And, boom, suddenly I found myself spending more time thinking that writing/debugging threaded/networking code :D . Yes, I regret my 2007 decision instead of running with my original intuition, but better late than never. With a working seed of the distributed engine working and tested (did I mention that scalacheck and specs are really powerful tools for behavior driven development?), I finally decided to start gravitating the Meandre infrastructure development effort from Java to Scala—did I mention that Scala is Martin Odersky’s child? Yes, such a decision has some impact on my colleagues, but I envision that the benefits will eventually weight out the initial resistance and step learning curve. At least, the last two group meetings nobody jumped off the window while presenting the key elements of Scala, and demonstrating how concise and elegant it made the first working seed of the distributed execution engine :D . We even got in discussions about the benefits of using Scala if it delivered everything I showed. I am lucky to work with such smart guys. If you want to take a peek at the distributed execution engine (a.k.a. Snowfield) at SEASR’s Fisheye.

Oh, one last thing. Are you using Atlassian’s Fisheye? Do you want syntax highlighting for Scala? I tweaked the Java definitions to make it highlight Scala code. Remember to drop the scala.def file on $FISHEYE_HOME/syntax directory add an entry on the filename.map to make it highlight anything with extension .scala.

Related posts:

  1. Fast REST API prototyping with Crochet and Scala
  2. Meandre: Semantic-Driven Data-Intensive Flow Engine
  3. Meandre Infrastructure 1.4 RC1 tagged

Temporary storage for Meandre’s distributed flow execution

Designing the distributed execution of a generic Meandre flow involves several moving pieces. One of those is the temporary storage required by the computing nodes (think of it as one node as one isolated component of a flow) to keep up with the data generated by a component, and also be able to replicate such […]

Related posts:

  1. Easy, reliable, and flexible storage for Python
  2. ZooKeeper and distributed applications
  3. Meandre: Semantic-Driven Data-Intensive Flow Engine

Designing the distributed execution of a generic Meandre flow involves several moving pieces. One of those is the temporary storage required by the computing nodes (think of it as one node as one isolated component of a flow) to keep up with the data generated by a component, and also be able to replicate such storage to the node containing the consumer to be fed. Such storage, local to each node, must guarantee at least three basic properties.

  • Transaction ready
  • Light weight implementation
  • Efficient write and read to minimize the contention on ports

Also, it is important to keep in mind that in a distributed execution scenario, each node requires to have its one separated and standalone storage system. Thus, it is also important to minimize the overhead of installation and maintenance of such storage subsystem. There are several alternatives available ranging from traditional relational data base systems to home-brewed solutions. Relational data base systems provide a distributed, reliable, stable, and well tested environment, but they may tend to require a quite involved installation and maintenance. Also, tuning those systems to optimize performance may required quite an involved monitoring and tweaking. On the other hand, home-brewed solutions can be optimized for performance by dropping non required functionality and focussing on writing and reading performance. However, such solutions tend to be bug prone and tend to become time consuming, not to mention that proving transaction correctness can be quite involved.

Fortunately there is a middle ground where efficient and stable transaction aware solutions are available. They may not provide SQL interfaces, but they still provide transaction boundaries. Also, since they are oriented to maximize performance, they can provide better throughput and operation latency than having to traverse the SQL stack. Examples of such storage systems can be found under the areas of key-value stores and column stores. Several options were considered while writing these line, but key-value stores were the ones that better matches the three requirements described above. Several options were informally tested, including solutions like HDF and Berkely DB, however the best performing by far under similar stress test conditions as the sketched temporary storage subsystem was Tokyo Cabinet. I already introduced and tested Tokyo Cabinet more than a year ago, but this time I was going to give it a stress test to basically convince myself that that was what I wanted to use for as temporary storage of the distributed flow execution.

The experiment

Tokyo cabinet is a collection of storage utilities including, among other facilities, key-value stores implemented as hash files or B-trees and flexible column stores. To illustrate the performance and throughput you can achieve. To implement multiple queues on a single casket (Tokyo Cabinet file containing the data store) B-trees with duplicated keys can help achieving such goal. The duplicated keys are the queue names, and the values are the UUIDs of the objects being store. Objects are also stored in the same B-tree by using the UIUD as a key and the value become the payload to store (usually an array of bytes).

Previously, I have been heavily using Python bindings to test Tokyo Cabinet, but this time I went down the Java route (since the Meandre infrastructure is written on Java). The Java bindings are basically build around JNI and statically link to the C version of Tokyo Cabinet library, giving away the best of both world. To measure how fast can I write data out of a port into the local storage in a transactional mode, I used the following piece of code.

	public static void main ( String args [] ) {
		int MAX = 10000000;
		int inc = 10;
		int cnt = 0;
		float fa [] = new float[8];
		int reps = 10;
 
		for ( int i=1 ; i<=MAX ; i*=inc  ) {
			//System.out.println("Size: "+i);
			for ( int j=0 ; j<reps ; j++ ) {	
				//System.out.println("\tRepetition: "+j);
 
				// open the database
				BDB bdb = new BDB();
 
				if(!bdb.open(TEST_CASKET_TCB, BDB.OWRITER | BDB.OCREAT | BDB.OTSYNC )){
					int ecode = bdb.ecode();
					fail("open error: " + bdb.errmsg(ecode));
				}
 
				// Add a bunch of duplicates
				long start = System.currentTimeMillis();
				bdb.tranbegin();
				for ( int k=0; k<i; k++ ) {
					String uuid = UUID.randomUUID().toString();
					bdb.putdup(QUEUE_KEY, uuid);
					bdb.putdup(uuid.getBytes(), uuid.getBytes());	
				}
				bdb.trancommit();
				fa[cnt] += System.currentTimeMillis()-start;
 
				// Clean up
				bdb.close();
				new File(TEST_CASKET_TCB).delete();
			}
			fa[cnt] /= reps;
			System.out.println(""+i+"\t"+fa[cnt]+"\t"+(fa[cnt]/i));
			cnt++;
		}
	}

The idea is very simple. Just go and star storing 1, 10, 100, 1000, 10000, 1000000, and 10000000 pieces of data at once in a transaction. Measure the time. For each data number repeat the operation 10 times and average the time trying to palliate the fact that the experiment was run on a laptop running all sorts of other concurrent applications. Plot the results to illustrate:

  1. time required to insert one piece of data as a function of the number of data involve in the transaction
  2. number of pieces of data wrote per second as a function of the number of data involve in the transaction

The idea is to expose the behavior of Tokyo Cabinet as more data is involved in a transaction to check if degradation happens as the volume increase. This is an important issue, since data intensive flows can generate large volumes of data per firing event.

The results

Results are displayed on the figures below.

Time per data unit as a function of number of data involve in a transactionThroughput as a function of number of data in a transaction

The first important element to highlight is that the time to insert one data element does not degrade as the volume increase. Actually, it is quite interesting that Tokyo Cabinet feels more comfortable as the volume per transaction grows. The throughput results are also interesting, since it shows that it is able to sustain transfers of around 40K data units per second, and that the only bottleneck is the disk cache management and bandwidth to the disk itself—which gets saturated after pushing more than 10K pieces of data.

The lessons learned

Tokyo Cabinet is a excellent candidate to support the temporary transactional storage required in a distributed execution of a Meandre flow. Other alternatives like MySQL, embedded Apache Derby, the Java edition of Berkeley DB, SQLite JDBC could not get even get close to such performance falling at least one order of magnitude behind.

Related posts:

  1. Easy, reliable, and flexible storage for Python
  2. ZooKeeper and distributed applications
  3. Meandre: Semantic-Driven Data-Intensive Flow Engine

Liquid: RDF endpoint for FluidDB

A while ago I wrote some thoughts about how to map RDF to and from FluidDB. There I explored how you could map RDF onto FluidDB, and how to get it back. That got me thinking about how to get a simple endpoint you could query for RDF. Imagine that you could pull FluidDB data […]

Related posts:

  1. Liquid: RDF meandering in FluidDB
  2. Temporary storage for Meandre’s distributed flow execution
  3. Efficient serialization for Java (and beyond)

A while ago I wrote some thoughts about how to map RDF to and from FluidDB. There I explored how you could map RDF onto FluidDB, and how to get it back. That got me thinking about how to get a simple endpoint you could query for RDF. Imagine that you could pull FluidDB data in RDF, then I could just get all the flexibility of SPARQL for free. With this idea in my mind I just went and grabbed Meandre, the JFLuidDB library started by Ross Jones, and build a few components.

The main goal was to be able to get an object, list of the tags, and express the result in RDF. FluidDB helps the mapping since objects are uniquely identified by URIs. For instance, the unique object 5ff74371-455b-4299-83f9-ba13ae898ad1 (FluidDB relies on UUID version four with the form xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx) is uniquely identified by http://sandbox.fluidinfo.com/objects/5ff74371-455b-4299-83f9-ba13ae898ad1 (or a url of the form http://sandbox.fluidinfo.com/objects/xxxxxxxx-xxxx-4xxx-yxxx-xxxxxxxxxxxx), in case you are using the sandbox or http://fluiddb.fluidinfo.com/objects/5ff74371-455b-4299-83f9-ba13ae898ad1 if you are using the main instance. Same story for tags. The tag fluiddb/about can be uniquely identified by the URI http://sandbox.fluidinfo.com/tags/fluiddb/about, or http://fluiddb.fluidinfo.com/tags/fluiddb/about.

A simple RDF description for and object

Once you get the object back the basic translated RDF version for object a10ab0f3-ef56-4fc0-a8fa-4d452d8ab1db should look like as the listing below in TURTLE notation.

<http://sandbox.fluidinfo.com/objects/a10ab0f3-ef56-4fc0-a8fa-4d452d8ab1db>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute fluiddb/default/tags/permission/update/policy"^^<http://www.w3.org/2001/XMLSchema#string> .

I will break the above example into small chunks and explain the above example into the three main pieces involved (the id, the about, and the tags). The basic construct is simple. First a triple to mark the object as a FluidDB object.

<http://sandbox.fluidinfo.com/objects/a10ab0f3-ef56-4fc0-a8fa-4d452d8ab1db>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/>   
.

Then if the object has an about associated on creation, another triple gets generated and added, as shown below. To be consistent, I suggest reusing DC description since that is what the about for an object tend to indicate.

<http://sandbox.fluidinfo.com/objects/a10ab0f3-ef56-4fc0-a8fa-4d452d8ab1db>
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute fluiddb/default/tags/permission/update/policy"^^<http://www.w3.org/2001/XMLSchema#string> 
.

Finally, if there are tags associated to the object, a bag gets created, and all the URI describing the tags get pushed into the bag as shown below.

<http://sandbox.fluidinfo.com/objects/a10ab0f3-ef56-4fc0-a8fa-4d452d8ab1db>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description>
.

Creating and RDF endpoint

Armed with the previous, the thing should be easy. Just allow querying for objects, then collect the object information, and finally generate the final RDF. Using Meandre and JFLuidDB I wrote a few components that allow the simple creation of such an endpoint as illustrated by the picture below.

Meandre FluidDB RDF endpoint

The basic mechanism is simple. Just push the query into the Query for objects component. This component will stream each of the uuid of the matched objects to Read object which pulls the object information. Then the object is passed to Object to RDF model that basically generates the RDF snipped shown in the example shown above for each of the objects pushed. Finally all the RDF fragments are reduced together by component Wrapped models reducer. Then the resulting RDF model just gets serialize into text using the Turtle notation. Finally the serialized text is printed to the console. The equivalent code could be express as a ZigZag script as:

#
# Imports eliminated for clarity
#

#
# Create the component aliases
#
alias  as OBJECT_TO_RDF
alias  as PRINT_OBJECT
alias  as QUERY_FOR_OBJECTS
alias  as READS_THE_REQUESTED_OBJECT
alias  as WRAPPED_MODELS_REDUCER
alias  as MODEL_TO_RDF_TEXT
alias  as PUSH_STRING

#
# Create the component instances
#
push_query_string = PUSH_STRING()
wrapped_models_reducer = WRAPPED_MODELS_REDUCER()
query_for_objects = QUERY_FOR_OBJECTS()
reads_object = READS_THE_REQUESTED_OBJECT()
model_to_rdf_text = MODEL_TO_RDF_TEXT()
print_rdf_text = PRINT_OBJECT()
object_to_rdf_model = OBJECT_TO_RDF()

#
# Set component properties
#
push_query_string.message = "has fluiddb/tag/path"
query_for_objects.fluiddb_url = "http://sandbox.fluidinfo.com"
eads_object.fluiddb_url = "http://sandbox.fluidinfo.com"
model_to_rdf_text.rdf_dialect = "TTL"

#
# Create the flow by connecting the components
#
@query_for_objects_outputs = query_for_objects()
@model_to_rdf_text_outputs = model_to_rdf_text()
@push_query_string_outputs = push_query_string()
@object_to_rdf_model_outputs = object_to_rdf_model()
@reads_object_outputs = reads_object()
@wrapped_models_reducer_outputs = wrapped_models_reducer()

query_for_objects(text: push_query_string_outputs.text)
model_to_rdf_text(model: wrapped_models_reducer_outputs.model)
object_to_rdf_model(object: reads_object_outputs.object)
reads_object(uuid: query_for_objects_outputs.uuid)[+200!]
print_rdf_text(object: model_to_rdf_text_outputs.text)
wrapped_models_reducer(model: object_to_rdf_model_outputs.model)

The only interesting element in the script is the [+200!] entry that creates 200 parallel copies of read object that will concurrently hit FluidDB to pull the data, trying to minimize the latency. The script could be compiled into a MAU and run. The output of the execution would look like the following:

$ java -jar zzre-1.4.7.jar pull-test.mau 
Meandre MAU Executor [1.0.1vcli/1.4.7]
All rights reserved by DITA, NCSA, UofI (2007-2009)
THIS SOFTWARE IS PROVIDED UNDER University of Illinois/NCSA OPEN SOURCE LICENSE.
 
Executing MAU file pull-test.mau
Creating temp dir pull-test.mau.run
Creating temp dir pull-test.mau.public_resources
 
Preparing flow: meandre://seasr.org/zigzag/1253813636945/4416962494019783033/flow/pull-test-mau/
2009-09-24 12:34:38.480::INFO:  jetty-6.1.x
2009-09-24 12:34:38.495::INFO:  Started SocketConnector@0.0.0.0:1715
Preparation completed correctly
 
Execution started at: 2009-09-24T12:34:38
----------------------------------------------------------------------------
<http://sandbox.fluidinfo.com/objects/a24b4a18-5483-47c6-9b62-0955210c7ebd>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute test/Net::FluidDB-name-1253772095.82845-0.944567286499904"^^<http://www.w3.org/2001/XMLSchema#string> .
 
<http://sandbox.fluidinfo.com/objects/5ff74371-455b-4299-83f9-ba13ae898ad1>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute test/Net::FluidDB-name-1253622685.3231461-0.437099602163897316"^^<http://www.w3.org/2001/XMLSchema#string> .
 
<http://sandbox.fluidinfo.com/objects/67e52346-527e-4bb7-b8f3-05fa8a8ae35b>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute test/Net::FluidDB-name-1253620190.69175-0.861614257420541"^^<http://www.w3.org/2001/XMLSchema#string> .
 
<http://sandbox.fluidinfo.com/objects/8a65a184-03d9-4881-95df-02fa0561a86f>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute fluiddb/namespaces/permission/update/exceptions"^^<http://www.w3.org/2001/XMLSchema#string> .
 
<http://sandbox.fluidinfo.com/objects/335b44e9-a72f-479d-ad60-3661a35231ba>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute test/Net::FluidDB-name-1253776141.95577-0.284175700598524"^^<http://www.w3.org/2001/XMLSchema#string> .
 
<http://sandbox.fluidinfo.com/objects/3bbf1cc6-731c-4e56-a664-adeb5484334f>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute fluiddb/namespaces/permission/delete/policy"^^<http://www.w3.org/2001/XMLSchema#string> .
 
<http://sandbox.fluidinfo.com/objects/aba5adcf-fd44-40ab-b702-9cc635650bc3>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute test/Net::FluidDB-name-1253614713.757-0.604769721717702"^^<http://www.w3.org/2001/XMLSchema#string> .
 
<http://sandbox.fluidinfo.com/objects/f61ceb3b-33df-4356-8e7d-c56d3d0ae338>
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#type>
              <http://sandbox.fluidinfo.com/objects/> , <http://www.w3.org/1999/02/22-rdf-syntax-ns#Bag> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_1>
              <http://sandbox.fluidinfo.com/tags/fluiddb/about> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_2>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/path> ;
      <http://www.w3.org/1999/02/22-rdf-syntax-ns#_3>
              <http://sandbox.fluidinfo.com/tags/fluiddb/tags/description> ;
      <http://purl.org/dc/elements/1.1/description>
              "Object for the attribute test/Net::FluidDB-name-1253615887.80879-0.0437609496034099"^^<http://www.w3.org/2001/XMLSchema#string> .
 
...

That’s it! A first RDF dump of the query!

The not so great news

The current FluidDB API does not provide any method to be able to pull data from more than one object at once. That basically means, that for each uuid a call to the server needs to be process. That is a huge latency overhead. The FluidDB guys know about it and they are scratching their heads on how to provide a “multi get”. A full trace of the output can be found on this FluidDB RDF endpoint trace.

This element is crucial for any RDF endpoint. Above I left out a basic element, the time measures. That part looks like:

Flow execution statistics

Flow unique execution ID : meandre://seasr.org/zigzag/1253813636945/4416962494019783033/flow/pull-test-mau/8D8E354A/1253813678323/1493255769/
Flow state               : ended
Started at               : Thu Sep 24 12:34:38 CDT 2009
Last update              : Thu Sep 24 12:37:28 CDT 2009
Total run time (ms)      : 170144

Basically 170s to pull only 238 objects, where all the time is spent round tripping to FluidDB.

Getting there

This basically means that such high latency would not allow efficient interactive usage of the end point. However, this exercise was useful to prof that simple RDF endpoints for FluidDB are possible and would greatly boost the flexibility of interaction with FluidDB . The current form of the endpoint is may still have value if you are not in a hurry, allowing you to run SPARQL queries against FluidDB data and get the best of both worlds.

The code use

If you are interested on running the code, you may need Meandre and the components I put together for the experiment, that you can get from http://github.com/xllora/liquid.

Related posts:

  1. Liquid: RDF meandering in FluidDB
  2. Temporary storage for Meandre’s distributed flow execution
  3. Efficient serialization for Java (and beyond)