Meandre is going Scala

After quite a bit of experimenting with different alternatives, Meandre is moving into Scala. Scala is a general purpose programming language designed to express common programming patterns in a concise, elegant, and type-safe way. This is not a radical process, but a gradual one while I am starting to revisit the infrastructure for the next […]

Related posts:

  1. Fast REST API prototyping with Crochet and Scala
  2. Meandre: Semantic-Driven Data-Intensive Flow Engine
  3. Meandre Infrastructure 1.4 RC1 tagged

After quite a bit of experimenting with different alternatives, Meandre is moving into Scala. Scala is a general purpose programming language designed to express common programming patterns in a concise, elegant, and type-safe way. This is not a radical process, but a gradual one while I am starting to revisit the infrastructure for the next major release. Scala also generates code for the JVM making mix and match trivial. I started fuzzing around with Scala back when I started the development of Meandre during the summer of 2007, however I did fall back to Java since that was what most of the people in the group was comfortable with. I was fascinated with Scala fusion of object oriented programming and functional programming. Time went by and the codebase has grown to a point that I cannot stand anymore cutting through the weeds of Java when I have to extend the infrastructure or do bug fixing—not to mention its verbosity even for writing trivial code.

This summer I decided to go on a quest to get me out of the woods. I do not mind relying on the JVM and the large collection of libraries available, but I would also like to get my sanity back. Yes, I tested some of the usual suspects for the JVM (Jython, JRuby, Clojure, and Groovy) but not quite what I wanted. For instance, I wrote most of the Meandre infrastructure services using Jython (much more concise than Java), but still not quite happy to jump on that boat. Clojure is also interesting (functional programming) but it would be hard to justify for the group to move into it since not everybody may feel comfortable with a pure functional language. I also toyed with some not-so-usual ones like Erlang and Haskell, but again, I ended up with no real argument that could justify such a decision.

So, as I started doing back in 2007, I went back to my original idea of using Scala and its mixed object-oriented- and functional-programming- paradigm. To test it seriously, I started developing the distributed execution engine for Meandre in Scala using its Earlang-inspired actors. And, boom, suddenly I found myself spending more time thinking that writing/debugging threaded/networking code :D . Yes, I regret my 2007 decision instead of running with my original intuition, but better late than never. With a working seed of the distributed engine working and tested (did I mention that scalacheck and specs are really powerful tools for behavior driven development?), I finally decided to start gravitating the Meandre infrastructure development effort from Java to Scala—did I mention that Scala is Martin Odersky’s child? Yes, such a decision has some impact on my colleagues, but I envision that the benefits will eventually weight out the initial resistance and step learning curve. At least, the last two group meetings nobody jumped off the window while presenting the key elements of Scala, and demonstrating how concise and elegant it made the first working seed of the distributed execution engine :D . We even got in discussions about the benefits of using Scala if it delivered everything I showed. I am lucky to work with such smart guys. If you want to take a peek at the distributed execution engine (a.k.a. Snowfield) at SEASR’s Fisheye.

Oh, one last thing. Are you using Atlassian’s Fisheye? Do you want syntax highlighting for Scala? I tweaked the Java definitions to make it highlight Scala code. Remember to drop the scala.def file on $FISHEYE_HOME/syntax directory add an entry on the filename.map to make it highlight anything with extension .scala.

Related posts:

  1. Fast REST API prototyping with Crochet and Scala
  2. Meandre: Semantic-Driven Data-Intensive Flow Engine
  3. Meandre Infrastructure 1.4 RC1 tagged

Temporary storage for Meandre’s distributed flow execution

Designing the distributed execution of a generic Meandre flow involves several moving pieces. One of those is the temporary storage required by the computing nodes (think of it as one node as one isolated component of a flow) to keep up with the data generated by a component, and also be able to replicate such […]

Related posts:

  1. Easy, reliable, and flexible storage for Python
  2. ZooKeeper and distributed applications
  3. Meandre: Semantic-Driven Data-Intensive Flow Engine

Designing the distributed execution of a generic Meandre flow involves several moving pieces. One of those is the temporary storage required by the computing nodes (think of it as one node as one isolated component of a flow) to keep up with the data generated by a component, and also be able to replicate such storage to the node containing the consumer to be fed. Such storage, local to each node, must guarantee at least three basic properties.

  • Transaction ready
  • Light weight implementation
  • Efficient write and read to minimize the contention on ports

Also, it is important to keep in mind that in a distributed execution scenario, each node requires to have its one separated and standalone storage system. Thus, it is also important to minimize the overhead of installation and maintenance of such storage subsystem. There are several alternatives available ranging from traditional relational data base systems to home-brewed solutions. Relational data base systems provide a distributed, reliable, stable, and well tested environment, but they may tend to require a quite involved installation and maintenance. Also, tuning those systems to optimize performance may required quite an involved monitoring and tweaking. On the other hand, home-brewed solutions can be optimized for performance by dropping non required functionality and focussing on writing and reading performance. However, such solutions tend to be bug prone and tend to become time consuming, not to mention that proving transaction correctness can be quite involved.

Fortunately there is a middle ground where efficient and stable transaction aware solutions are available. They may not provide SQL interfaces, but they still provide transaction boundaries. Also, since they are oriented to maximize performance, they can provide better throughput and operation latency than having to traverse the SQL stack. Examples of such storage systems can be found under the areas of key-value stores and column stores. Several options were considered while writing these line, but key-value stores were the ones that better matches the three requirements described above. Several options were informally tested, including solutions like HDF and Berkely DB, however the best performing by far under similar stress test conditions as the sketched temporary storage subsystem was Tokyo Cabinet. I already introduced and tested Tokyo Cabinet more than a year ago, but this time I was going to give it a stress test to basically convince myself that that was what I wanted to use for as temporary storage of the distributed flow execution.

The experiment

Tokyo cabinet is a collection of storage utilities including, among other facilities, key-value stores implemented as hash files or B-trees and flexible column stores. To illustrate the performance and throughput you can achieve. To implement multiple queues on a single casket (Tokyo Cabinet file containing the data store) B-trees with duplicated keys can help achieving such goal. The duplicated keys are the queue names, and the values are the UUIDs of the objects being store. Objects are also stored in the same B-tree by using the UIUD as a key and the value become the payload to store (usually an array of bytes).

Previously, I have been heavily using Python bindings to test Tokyo Cabinet, but this time I went down the Java route (since the Meandre infrastructure is written on Java). The Java bindings are basically build around JNI and statically link to the C version of Tokyo Cabinet library, giving away the best of both world. To measure how fast can I write data out of a port into the local storage in a transactional mode, I used the following piece of code.

	public static void main ( String args [] ) {
		int MAX = 10000000;
		int inc = 10;
		int cnt = 0;
		float fa [] = new float[8];
		int reps = 10;
 
		for ( int i=1 ; i<=MAX ; i*=inc  ) {
			//System.out.println("Size: "+i);
			for ( int j=0 ; j<reps ; j++ ) {	
				//System.out.println("\tRepetition: "+j);
 
				// open the database
				BDB bdb = new BDB();
 
				if(!bdb.open(TEST_CASKET_TCB, BDB.OWRITER | BDB.OCREAT | BDB.OTSYNC )){
					int ecode = bdb.ecode();
					fail("open error: " + bdb.errmsg(ecode));
				}
 
				// Add a bunch of duplicates
				long start = System.currentTimeMillis();
				bdb.tranbegin();
				for ( int k=0; k<i; k++ ) {
					String uuid = UUID.randomUUID().toString();
					bdb.putdup(QUEUE_KEY, uuid);
					bdb.putdup(uuid.getBytes(), uuid.getBytes());	
				}
				bdb.trancommit();
				fa[cnt] += System.currentTimeMillis()-start;
 
				// Clean up
				bdb.close();
				new File(TEST_CASKET_TCB).delete();
			}
			fa[cnt] /= reps;
			System.out.println(""+i+"\t"+fa[cnt]+"\t"+(fa[cnt]/i));
			cnt++;
		}
	}

The idea is very simple. Just go and star storing 1, 10, 100, 1000, 10000, 1000000, and 10000000 pieces of data at once in a transaction. Measure the time. For each data number repeat the operation 10 times and average the time trying to palliate the fact that the experiment was run on a laptop running all sorts of other concurrent applications. Plot the results to illustrate:

  1. time required to insert one piece of data as a function of the number of data involve in the transaction
  2. number of pieces of data wrote per second as a function of the number of data involve in the transaction

The idea is to expose the behavior of Tokyo Cabinet as more data is involved in a transaction to check if degradation happens as the volume increase. This is an important issue, since data intensive flows can generate large volumes of data per firing event.

The results

Results are displayed on the figures below.

Time per data unit as a function of number of data involve in a transactionThroughput as a function of number of data in a transaction

The first important element to highlight is that the time to insert one data element does not degrade as the volume increase. Actually, it is quite interesting that Tokyo Cabinet feels more comfortable as the volume per transaction grows. The throughput results are also interesting, since it shows that it is able to sustain transfers of around 40K data units per second, and that the only bottleneck is the disk cache management and bandwidth to the disk itself—which gets saturated after pushing more than 10K pieces of data.

The lessons learned

Tokyo Cabinet is a excellent candidate to support the temporary transactional storage required in a distributed execution of a Meandre flow. Other alternatives like MySQL, embedded Apache Derby, the Java edition of Berkeley DB, SQLite JDBC could not get even get close to such performance falling at least one order of magnitude behind.

Related posts:

  1. Easy, reliable, and flexible storage for Python
  2. ZooKeeper and distributed applications
  3. Meandre: Semantic-Driven Data-Intensive Flow Engine