August, 2009
Merging Data And Processing: Why it doesn’t “just work”
Sunday, August 30th, 2009
If you’ve been using Coherence for a while (or any other distributed cache service like Gigaspaces or Gemstone) you may well have had that wonderful ‘penny dropping’ moment when considering the collocation of data and processing. Suddenly you can perceive architectures where you no longer need to move all that data around before operating on it. Your grid already has it there at your disposal.
As a toy example lets consider pricing a large portfolio of trades. The pricing algorithm would require trade and market data as input, but as these are logically distinct entities you are likely to store each in a different cache. But for efficiency you’ll need the data for the corresponding trade and the market data on the same node, so that wire calls to collocate them don’t need to be made prior to pricing.
Coherence gives you a great way to do this: Affinity instructs Coherence to store data in a certain way, that is to say it is grouped together so that all data items with the same ‘affinity key’ are kept together (see figures).
Thinking along these lines you’d think we might have solved our pricing problem. We can use affinity to keep the trade and market data together. As it happens this does work (depending somewhat on your data distribution). However it all falls to bits when you need to perform the processing to price the trade.
The problem is that you want to wrap the processing in a Coherence function that is ‘data aware’. Most likely an Aggretator or possibly an Entry Processor. The reasoning being this is that these functions will automatically route themselves to the nodes where the data resides.
The alternative approach is to use an invocable, but this is not data aware so you have to write extra code to route each request to the correct node (perfectly possible but not the most elegant or efficient solution).
So persisting with the data-aware functions as a wrapper for our pricing algorithm, lets say an Aggregator, you would quickly hit a problem with the way that Coherence is architected internally. Aggregators run inside the Cache Service (i.e. the service that manages data in Coherence) and the Cache Service threading model does not permit re-enterant calls [1].
So what does that mean? It means that, if you ran your Aggregator against the trades cache, you would not be able to call out from that Aggregator into the Market Data cache to get the data you require to price the trade. Such a call would ultimately cause a deadlock.
The diagram demonstrates the CacheService threading model under a simulated deadlock. Even when the Cache service is configured with a thread pool there is the possibility that a re-entrant call will be scheduled back to the worker thread that is making that call, particularly in the case where the thread pool is small and the EntryProcessor workload is long.
A work around for this problem is to place the parent cache (or more precisely, the cache against which the Entry Processor or Aggregator is run) in a different Cache Service to the cache that the function is operating on. By splitting into at least two Cache Services the call to the ‘other’ cache will enter via a different Main thread to which invoked the Aggregator that you are currently running. This removes the possibility of deadlock.
However, for our use case, spitting the market data cache and trades cache into different cache services is not an option as it breaks Affinity. The data items will no longer collocate (as affinity is based on the hashing algorithm Coherence uses to store data, and that algorithm is at a cache service level).
So how do you solve this problem. Well you have two options.
-
You sidestep the threading model by accessing the backing map directly. This way to can access the data in the market data cache using the thread you are on (without Coherence re-queuing it). The problem with this method is that it is a back door and that leaves you open to potential problems (could there be a time when you expect the item to be in the local JVM but it is not?)
-
As mentioned earlier you wrap your request in an invocable (which does not have the same threading issues as it runs in the Invocation Service) and route it to the correct machine yourself. This is described in the final diagram.
As to which is best to do. Well I guess that depends how risk averse you are 😉 but for what it’s worth I use the former.
[1] http://coherence.oracle.com/display/COH35UG/Constraints+on+Re-entrant+Calls
Coherence Part IV: Merging Data And Processing
Saturday, August 29th, 2009
A lot of people start using Oracle Coherence as a distributed cache because they need to get away from a data-bottlenecking problem. Many of open source NoSQL stores will help you with this problem too (if all you need is to stream large data volumes, being solely in memory is unlikely to be worth the additional hardware cost). However there are some big advantages to being entirely in memory. Distributed execution occurs next to the data it needs to operate on, either on request or as a result of some state change (think trigger), and this is a very powerful tool. This can lead to one of those ‘penny-dropping moment’ as the potential of merging data and processing, particularly in a wholly in-memory architecture, begins to unfold.
The benefits or moving computation to data have been around for a very long time – stored procedures being the classic example. The possibilities are extended significantly when the processing space is actually a distributed data grid, with all logic executing in the same language (in this case Java) and with data represented hierarchically (as objects) rather relationally. Suddenly a whole world of fast distributed processing on collocated data opens up.
Interestingly this is one of the main drivers for MapReduce (e.g. Hadoop): deal with very large data sets in a simple (albeit somewhat brute-force) way, collocating data and processing (although in Hadoop’s case it’s disk based) to allow processing to scale to peta- or exabytes. This same pattern can be applied in Coherence but with a slightly different as the goal: extending your application tier to allow real time processing in virtual address space that can grow to terabytes.
There are a couple of points worthy of note before we go on:
- The process of Merging Data and Processing is not seamless. The details of this are covered in another post. This article is meant solely as an introduction.
- There are in fact databases with exactly the same benefits, with respect to merging data and processing. VoltDB is closest (solely in memory, Java stored procedures) but there are many other shared nothing DBs that have impressive performance. Exasol and Paraccel are two worthy of note.
Data Affinity: Ensuring Collocation of Disparate Data Sets
Data affinity allows associations to be set up between data in different caches so that the associated data objects in the two different caches are collocated on the same machine. In the example here trade data and market data are linked via the ticker meaning that all trades for ticker ATT will be stored on the same machine as the ATT market data.
Using Coherence to Run Processing in the Grid
Thus when an Entry Processor or Aggregator executes, say to run a trade pricing routine, it can access the trade and its market data without having to make a wire call as the market data for that particular trade will be held on the same machine (whenever possible).
This presents the possibility of folding the classic service-centric approach in two[1]. Suddenly compute architectures can be merged into one layer that has responsibility for compute and data. The fundamental advantage being that far less data needs to be transmitted across the wire.
Increased Wire Efficiency
In a standard architecture (the upper example) data is retrieved from a data source and sent to the application tier for processing. However in the Coherence Application-Centric approach (the lower example) the code is sent to the machine that holds the data for execution. This is one of the real penny-dropping concepts that can revolutionise a systems performance.
But it is important to note that Coherence is not a direct substitute for a compute grid such as DataSynapse. Application-Centric Coherence involves leveraging in the inherent distribution Coherence provides as well as its inherent collocation of processing and data.
Thus looking at the anatomy of a simple Application-Centric deployment we see:
- A feed server enters a trade into the Trade cache using an Entry Processor to execute some pre-processing.
- This in turn fires a CacheStore which reliably executes some domain processing for that trade on the same machine.
- The domain processing results in the trade being updated in the cache.
This is just one sample pattern, there are many others. Simply using Aggregators (thing MapReduce) distribute work to collocated data on the grid is a powerful pattern in it’s own right. All these patterns share the ability to collocate domain processing in a Java across a large, distributed address space. This means that not only is the execution collocated with the data but the executions are implicitly load balanced across the Coherence cluster.
So Coherence has evolved from being a data repository to an application container which provides:
- Distribution of processing across multiple machines
- Fault tolerance of data and processing (including async)
- Scalability to potentially thousands of nodes
- The ability to collocate data and processing.
An enticing proposition!!!
[1] Service-Centric and Application-Centric are terms coined by Lewis Foti to describe the two broad architectural styles used to build Coherence based systems. Service-Centric architectures use Coherence simply as a data repository. Application-Centric users use Coherence as a framework for building event based distributed systems. Such systems leverage the inherent distribution and fault tolerance that comes with the product with operations being generally collocated with the data they require. This merges the Application and Data layers of the system.
See also:
- coherence-bootstrap on github
- Coherence Part I: An Introduction
- Coherence Part II: Delving a Little Deeper
- Coherence Part III: The Coherence Toolbox