Sizing Coherence Indexes

Monday, April 28th, 2014

This post suggests three ways to measure index sizes in Coherence: (a)Using the Coherence MBean (not recommended) (b) Use JMX to GC the cluster (ideally programatically) as you add/remove indexes – this is intrusive (c) Use the SizeOf invocable (recommended) – this requires teh use of a -javaagent option on your command line.

(a) The Coherence MBean

Calculating the data size is pretty easy, you just add a unit calculator and sum over the cluster (there is code to do that here: test, util). Indexes however are more tricky.

Coherence provides an IndexInfo MBean that tries to calculate the size. This is such an important factor in maintaining you cluster it’s worth investigating.

Alas the IndexInfo Footprint is not very accurate. There is a test,IsCoherenceFootprintMBeanAccurate.java,which demonstrates there are huge differences in some cases (5 orders of magnitude). In summary:

– The Footprint is broadly accurate for fairly large fields (~1k) where the index is unique.
– As the cardinality of the index drops the Footprint Mbean starts to underestimate the footprint.
– As the size of the field being indexed gets smaller the MBean starts to underestimate the index.

Probably most importantly for the most likely case, for example the indexed fields is fairly small say 8B, and the cardinality is around half the count, the MBean estimate is out by three orders of magnitude.

Here are the results for the cardinality of half the count and field sizes 8B, 16B, 32B

     Ran: 32,768 x 32B fields [1,024KB indexable data], Cardinality of 512 [512 entries in index, each containing 64 values], Coherence MBean measured: 49,152B. JVM increase: 3,162,544B. Difference: 6334%
     Ran: 65,536 x 16B fields [1,024KB indexable data], Cardinality of 512 [512 entries in index, each containing 128 values], Coherence MBean measured: 40,960B. JVM increase: 5,095,888B. Difference: 12341%
     Ran: 131,072 x 8B fields [1,024KB indexable data], Cardinality of 512 [512 entries in index, each containing 256 values], Coherence MBean measured: 40,960B. JVM increase: 10,196,616B. Difference: 24794%

In short, it’s too inaccurate to be useful.

(b) Using JMX to GC before and after adding indexes

So the we’re left with a more intrusive process to work out our index sizes:

  1. Load your cluster up with indexes.
  2. GC a node and take it’s memory footprint via JMX/JConsole/VisualVm
  3. Drop all indexes
  4. GC the node again and work out how much the heap dropped by.

I have a script which does this programatically via JMX. It cycles through all the indexes we have doing:

ForEach(Index) { GC->MeasureMemory->DropIndex->GC->MeasureMemory->AddIndexBack }

This method works pretty well although it takes a fair while to run if you have a large number of indexes, and is intrusive so you couldn’t run in production. It also relies on our indexes all being statically declared in a single place. This is generally a good idea for any project. I don’t know of a way to extract the ValueExtractor programatically from Coherence so just use the static instance in our code.

(c) Use Java’s Inbuilt Instrumentation

This is the best way (in my opinion). It’s simple and accurate. The only issue is that you need to start your coherence processes with a javaagent as it’s using the Instrumentation API to size indexes.

The instrumentation agent itself is very simple, and uses the library found here. All we need to wrap that is an invocable which executes it on each node in the cluster.

The invocable just loops over each cache service, and each cache within that service, calculating the size of the IndexMap using the instrumentation SizeOf.jar

To implement this yourself:

1) Grab these two classes: SizeOfIndexSizer.java, IndexCountingInvocable.java and add them to your classpath. The first sets the invocable off, handling the results. The second is the invocable that runs on each node and calculates the size of the index map.

2) Take a copy of SizeOf.jar from here and add -javaagent:<pathtojar>/SizeOf.jar to your command line.

3) Call the relevant method on SizeOfIndexSizer.




When is POF a Good Idea?

Saturday, April 12th, 2014

POF is pretty cool. Like Protocol Buffers, which they are broadly similar to, POF provides an space-efficient, byte-packed wire / storage format which is navigable in its binary form. This makes it a better than Java serialisation for most applications (although if you’re not using Coherence then PB are a better bet).

Being a bit-packed format it’s important to understand the performance implications of extracting different parts of the POF stream. This being different to the performance characteristics of other storage formats, in particular fixed width formats such as those used in most databases which provide very fast traversal.

To get an understanding of the POF format see the primer here. In summary:

1. Smaller than standard Java Serialisation: The serialised format is much smaller than java serialisation as only integers are encoded in the stream rather than the fully class/type information.
2. Smaller than fixed-width formats: The bit-packed format provides a small memory footprint when compared to fixed length fields and doesn’t suffer from requiring overflow mechanisms for large values. This makes it versatile.
3. Navigable: The stream can be navigated to read single values without deserialising the whole stream (object graph).

Things to Watch Out For:

1. Access to fields further down the stream is O(n) and this can become dominant for large objects:

Because the stream is ‘packed’, rather than using fixed length fields, traversing the stream is O(n), particularly the further down the stream you go. That’s to say extracting the last element will be slower than extracting the first. Fixed width fields have access times O(1) as they can navigate to a field number directly.

We can measure this using something along the lines of:

Binary pof = ExternalizableHelper.toBinary(object, context);
SimplePofPath path = new SimplePofPath(fieldPos);//vary the position in the stream
PofExtractor pofExtractor = new PofExtractor(ComplexPofObject.class, path);

while (count --&gt; 0) {
    PofValue value = PofValueParser.parse(pof, context);

If you want to run this yourself it’s available here: howMuchSlowerIsPullingDataFromTheEndOfTheStreamRatherThanTheStart(). This code produces the following output:

&gt; Extraction time for SimplePofPath(indices=1) is 200 ns
&gt; Extraction time for SimplePofPath(indices=2) is 212 ns
&gt; Extraction time for SimplePofPath(indices=4) is 258 ns
&gt; Extraction time for SimplePofPath(indices=8) is 353 ns
&gt; Extraction time for SimplePofPath(indices=16) is 564 ns
&gt; Extraction time for SimplePofPath(indices=32) is 946 ns
&gt; Extraction time for SimplePofPath(indices=64) is 1,708 ns
&gt; Extraction time for SimplePofPath(indices=128) is 3,459 ns
&gt; Extraction time for SimplePofPath(indices=256) is 6,829 ns
&gt; Extraction time for SimplePofPath(indices=512) is 13,595 ns
&gt; Extraction time for SimplePofPath(indices=1024) is 27,155 ns

It’s pretty clear (and not really surprising) that the navigation goes O(n). The bigger problem is that this can have an affect on your queries as your datasize grows.

Having 100 fields in a pof object is not unusual, but if you do, the core part of your query is going to run 20 slower when retrieving the last field than it is when you retrieve the first.

For a 100 field object, querying on the 100th field will be 20 times slower than querying the first

This is just a factor of the variable length encoding. The code has no context of the position of a particular field in the stream when it starts traversing it. It has no option but to traverse each value, find it’s length and skip to the next one. Thus the 10th field is found by skipping the first 9 fields. This is in comparison to fixed length formats where extracting the nth field is always O(1).

2) It can be more efficient to deserialise the whole object, and makes your code simpler too

If you’re just using a simple filter (without an index) POF makes a lot of sense, use it, but if you’re doing more complex work that uses multiple fields from the stream then it can be faster to deserialise the whole object. It also makes your code a lot simpler as dealing with POF directly gets pretty ugly as the complexity grows.

We can reason about whether it’s worth deserialising the whole object by comparing serialisation times with the time taken to extract multiple fields.

The test whenDoesPofExtractionStopsBeingMoreEfficient() measures the break even point beyond which we may as well deserialise the whole object. Very broadly speaking it’s 4 extractions, but lets look at the details.

Running the test yields the following output:

On average full deserialisation of a 50 field object took 3225.0ns
On average POF extraction of first 5 fields of 50 took 1545.0ns
On average POF extraction of last 5 fields of 50 took 4802.0ns
On average POF extraction of random 5 fields of 50 took 2934.0ns

Running this test and varying the number of fields in the object leads to the following conclusions.

– for objects of 5 fields the break even point is deserialising 2 pof fields
– for objects of 20 fields the break even point is deserialising 4 pof fields
– for objects of 50 fields the break even point is deserialising 5 pof fields
– for objects of 100 fields the break even point is deserialising 7 pof fields
– for objects of 200 fields the break even point is deserialising 9 pof fields

Or to put it another way, if you have 20 fields in your object and you extracted them one at a time it would be five times slower than deserialising.

In theory the use of the PofValue object should optimise this. The PofValueParser, which is used to create PofValue objects, effectively creates an index over the pof stream meaning that, in theory, reading multiple fields from the pof value should be O(1) each. However in these test I have been unable to see this gain.

Pof is about more than performance. It negates the need to put your classes (which can change) on the server. This in itself is a pretty darn good reason to use it. However it’s worth considering performance. It’s definitely faster than Java serialisation. That’s a given. But you do need to be wary about using pof extractors to get individual fields, particularly if you have large objects.

The degradation is O(n), where n is the number of fields in the object, as the stream must be traversed one field at a time. This is a classic space/time tradeoff. The alternative, faster O(1), fixed-width approach would require more storage which can be costly for in memory technologies.

Fortunately there is a workaround of sorts. If you have large objects, and are using POF extraction for your queries (i.e. you are not using indexes which ensure a deserialised field will be on the heap), then prefer composites of objects to large (long) flat ones. This will reduce the number of skipPofValue() calls that the extractor will have to do.

If you have large objects and are extracting many fields to do their work (more than 5-10 extractions per object) then it may be best to deserialise the whole thing. In cases like this pof-extraction will be counter productive, at least from a performance perspective. Probably more importantly, if you’re doing 5-10 extractions per object, you are doing something fairly complex (but this certainly happens in Coherence projects) so deserialising the object and writing your logic against PoJos is going to make your code look a whole lot better too. If in doubt, measure it!

Ref: JK posted on this too when we first became aware of the problem.

POF Primer

Saturday, April 12th, 2014

This is a brief primer on POF (Portable Object Format) used in Coherence to serialise data. POF is much like Google’s Protocol Buffers so if you’re familiar with those you probably don’t need to read this.

POF a variable length, bit-packed serialisation format used to represent object graphs as byte arrays in as few bytes as possible, without the use of compression. Pof’s key property is that it is navigable. That is to say you can pull a value (object or primitive) out of the stream without having to deserilalise the whole thing. A feature that is very useful if you want to query a field in an object which is not indexed.

The Format

Conceptually simple, each class writes out its fields to a binary stream using a single bit-packed (variable length encoded) integer as an index followed by a value. Various other pieces of metadata are also encoded into the stream using bit-packed ints. It’s simplest to show in pictorially:

Variable Length Encoding using Bit-Packed Values

Variable length encoding uses as few bytes as possible to represent a field. It’s worth focusing on this for a second. Consider the job of representing an Integer in as few bytes as possible. Integers are typically four bytes but you don’t really need four bytes to represent the number 4. You can do that in a few bits.

PackedInts in Coherence take advantage of this to represents an integer in one to five bytes. The first bit of every byte indicates whether subsequent bytes are needed to represent this number. The second bit of the first byte represents the sign of the number. This means there are six ‘useful’ bits in the first byte and 7 ‘useful’ bits in all subsequent ones, where ‘useful’ means ‘can be used to represent our number’.

Taking an example let’s look at representing the number 25 (11001) as a bit-packed stream:

       25     //Decimal
       11001  //Binary
[0 0 0011001] //POF (leading bits denote: whether more bytes are needed, the sign)

Line 1 shows our decimal, line 2 its binary form. Line 3 shows how it is represented as POF. Note that we have added four zeros to the front of the number denoting that no following bytes are required to represent the number and that the number is positive.

If the number to be encoded is greater than 63 then a second byte is needed. The first bit again signifies whether further bits will be needed to encode the number.  There is no sign-bit in the second byte as it’s implied from the first. Also, just to confuse us a little, the encoding of the numeric value is different to the single-byte encoding used above: the binary number is reversed so the least significant byte is first (the whole number appears reversed across the two bytes). So the number 128 (10000000) would be encoded:

     128                //Decimal
     10000000           //Binary
     00000001           //Binary (reversed)
     000000     00010   //Aligned
[1 0 000000] [0 00010]  //POF

The pattern continues with numbers greater than or equal to 2^13 which need a third byte to represent them. For example 123456 (11110001001000000) would be represented

     123456                          //Decimal
     11110001001000000               //Binary
     00000010010001111               //Reversed
     000000     1001000     0001111  //Aligned
[1 0 000000] [1 0001001] [0 0001111] //POF

Note again that the binary number is reversed and then laid with the least significant bit first (unlike the single btye encoding above).

In this way the average storage is as small as it can be without actually using compression.

Exploring a POF Stream (see Gist)

We can explore a little further by looking at the Coherence API. Lets start with a simple POF object:

    public class PofObject implements PortableObject {
        private Object data;

        PofObject(Object data) {
            this.data = data;
        public void readExternal(PofReader pofReader) throws IOException {
            data = pofReader.readObject(1);
        public void writeExternal(PofWriter pofWriter) throws IOException {
            pofWriter.writeObject(1, data);

We can explore each element in the stream using the readPackedInt() method to read POF integers and we’ll need a readSafeUTF() for the String value:

    SimplePofContext context = new SimplePofContext();
    context.registerUserType(1042, PofObject.class, new PortableObjectSerializer(1042));

    PofObject object = new PofObject("TheData");

    //get the binary &amp; stream
    Binary binary = ExternalizableHelper.toBinary(object, context);
    ReadBuffer.BufferInput stream = binary.getBufferInput();

    System.out.printf("Header btye: %s\n" +
                    "ClassType is: %s\n" +
                    "ClassVersion is: %s\n" +
                    "FieldPofId is: %s\n" +
                    "Field data type is: %s\n" +
                    "Field length is: %s\n",

    System.out.printf("Field Value is: %s\n",
            binary.toBinary(6, "TheData".length() + 1).getBufferInput().readSafeUTF()

Running this code yields:

> Header btye: 21
> ClassType is: 1042
> ClassVersion is: 0
> FieldPofId is: 1
> Field data type is: -15
> Field length is: 7
> Field Value is: TheData

Notice line 25, which reads the UTF String, requires the length as well as the value (it reads bytes 6-15 where 6 is the length and 7-15 are the value).

Finally POF Objects are nested into the stream. So if field 3 is a user’s object, rather than a primitive value, an equivalent POF-stream for the user’s object is nested in the ‘value’ section of the stream, forming a tree that represents the whole object graph.

The code for this is available on Gist and there is more about POF internals in the coherence-bootstrap project on github: PofInternals.java.

Beyond the Data Grid: Coherence, Normalisation, Joins and Linear Scalability (QCon)

Thursday, January 27th, 2011

Normalisation is, in many ways, the antithesis of typical cache design. We tend to denormalise for speed. Building a data store (rather than a cache) is a little different: Manageability, versioning, bi-temporal reconstitution become more important factors. Normalisation helps solve these problems but normalisation in distributed architectures suffers from problems of distributed joins, requiring iterative network calls.

We’ve developed a mechanism for managing normalisation based on a variant of the Star Schema model used in data warehousing. In our implementation Facts are held distributed (partitioned) in the data nodes and Dimensions are replicated throughout the query-processing nodes. To save space we track ‘used’, or as we term them ‘connected’ data, to ensure only useful objects are replicated.

This model was presented at the QCon 2011 and at the Coherence SIG.

You can find the slides here (Powerpoint – 7MB).

See Also:

Coherence: The Fallacy of Linear Scalability

Saturday, December 12th, 2009

I stated in a previous post: “Dessert Island Disks Top 3 reasons for using Coherence have to be: Speed, Scalability and Fault Tolerance”. There are good reasons for this statement (discussed further here) but in some ways it is a little naive, particularly when considering scalability.

The underpinning of Coherence’s scalability is that adding a machine to a cluster proportionally increases the amount of CPU, network bandwidth and storage (memory) available. This is, of course, a fact, however the statement is only really of worth if the mechanism used for reading and writing data scales too.

Coherence uses a Shared Nothing architecture and, as is typical with this style, basic reads (cache.put(key, val)) and writes (cache.get(key)) scale linearly as the number of nodes in the cluster is increased. This scaling leverages the fact that data is partitioned (spread) across the available machines. Any single read or write operation is simply routed to the machine that owns the partition i.e. only one machine is ever asked to service a single ‘get’.get

The problem is that, in real world use cases, ‘get’ and ‘put’ are not enough. Users inevitably evolve more complex access patterns that necessitate the use of queries (by queries I means non-key-based access to data)…  and queries don’t scale in the same way.query

When Coherence executes a query, that query cannot leverage the hashing algorithm used to partition data. Thus the query must be sent to all nodes in the cluster. This implies that ‘the number of queries serviced by the system’ = ‘the number of queries serviced by each machine’. The implications for scalability are obvious.

So how do you manage this problem in Coherence? There are a few techniques you can use:

  • Try to use key based access instead of queries wherever possible.
  • Increase the cluster size so that the amount of data serviced by each node is reduced. This decreases the response time for each request and thus the overall load on each server. It is however somewhat expensive and wasteful.
  • Use a Partition Filter to paginate over the available partitions. This spreads the query over a longer time frame reducing the risk of load spikes.

You may be slightly disappointed with this list as it contains no ‘silver bullet’ solution. The reason is that none of them address the fundamental problem directly, it being intrinsic to the architectural style (shared nothing). Addressing the problem would require a change to the architecture at a macroscopic level. The techniques suggested here are simply tips that help postpone the onset of the problem.

How Fault Tolerant Is Coherence Really?

Wednesday, November 4th, 2009

Dessert Island Disks Top 3 reasons for using Coherence have to be: Speed, Scalability and Fault Tolerance.

When designing systems with Coherence it’s easy to get carried away with the latter, especially when you start to embed your own services and leverage the implicit fault tolerance.

But in all this excitement I’ve often found myself overlooking  what the guarantees really are. FailureMost people know that Coherence backs up your data on another node so that if one process is lost it can be restored (see diagram). They also may know that the number of backups Coherence takes, for each piece of data you store, is configurable. However it takes a little consideration to become totally clear on what guarentees of fault tollerance Coherence really provides, hence my summary here.

There are two questions worth considering:

  1. How many machines failures can occur simultaneously without the cluster loosing data?
  2. What is the maximum reduction in cluster size under which the cluster can operate without data loss?

These two aspects of fault tolerance seem quite similar at first glance but they are driven from very different aspects of the technology. The first refers to concurrent loss of hardware. After a machine is lost Coherence will redistribute backups on the remaining hardware so that every partition has a backup somewhere else. The first question above arises where a second machine is lost before this redistribution phase has had an opportunity to run.

The second question is to do with physical resources, most commonly RAM. If you loose 1/3 of the machines in your cluster do you have enough memory on the rest of them to store a primary and backup copy for the data the lost machines were holding (currently Coherence will try to make a backup even if it means throwing an OutOfMemoryError – something I’m told is being addressed)? Physical memory tends to be the problem here as it is a hard limit (hit a CPU limit and you slow down, hit a memory limit and you get corruption) but hitting a CPU limit is probably equally likely on most clusters. The important point is that you size your cluster with this in mind. That’s to say that you include enough memory headroom for primary and backup copies of the data after the loss of some number of machines (An algorithm for sizing your cluster can be found here).

Having done such analysis however, and I know many teams that do, it’s tempting to then think your cluster can survive the loss of 1/3 of it’s hardware (or whatever resource overhead they provisioned) because there is enough physical resource for Coherence to recover. This would be true if the loss of nodes were separated in time but not if they occurred simultaneously.

For the simultaneous failure of machines, in the current version of Coherence (3.5), you can quantify the products fault tolerance as this:

The limit of Coherence’s fault tolerance is the loss of more than one physical machine in a cluster.

So where does this assumed limit come from? Well Coherence positions backup data based on two conditions:

  • Backup data is placed on a different host to the primary, where possible.
  • Backups of the partitions in a single JVM are spread evenly over the cluster.

The implication is that the loss of a single machine with be handled with the added benefit that the even distribution of backup data across the cluster makes redistribution events rapid (think BitTorrant).

However the loss of a second machine will, most likely, cause data loss if some of the data from the first machine is backed up on the second. The cluster won’t loose much, but it will likely loose some.


One suggestion for combating this is to increase the backup count. Unfortunately, in the current version, this doesn’t help. Coherence is really smart about how it places the first backup copy; putting it on a different machine where possible and spreading the backups evenly across the cluster. But when it comes to the second backup it is not so clever. The problem of backup placement is O(n), hence this restriction. As a result, configuring a second backup provides no extra guarantee that the second backup will be held on a different machine to the first, hence loss of two machines may still cause data loss (but the probability of this has been reduced).

Luckily there is light at the end of the tunnel. The Coherence team are working on smarter tertiary backups, or so I’m told.

Merging Data And Processing: Why it doesn’t “just work”

Sunday, August 30th, 2009

If you’ve been using Coherence for a while (or any other distributed cache service like Gigaspaces or Gemstone) you may well have had that wonderful ‘penny dropping’ moment when considering the collocation of data and processing. Suddenly you can perceive architectures where you no longer need to move all that data around before operating  on it. Your grid already has it there at your disposal.

As a toy example lets consideaffinityr pricing a large portfolio of trades. The pricing algorithm would require trade and market data as input, but as these are logically distinct entities you are likely to store each in a different cache. But for efficiency you’ll need the data for the corresponding trade and the market data on the same node, so that wire calls to collocate them don’t need to be made prior to pricing.

Coherence gives you a great way to do this: Affinity instructs Coherence to store data in a certain way, that is to say it is grouped together so that all data items with the same ‘affinity key’ are kept together (see figures).

Thinking along these lines you’d think we might have solved our pricing problem. We can use affinity to keep the trade and maffinity2arket data together. As it happens this does work (depending somewhat on your data distribution). However it all falls to bits when you need to perform the processing to price the trade.

The problem is that you want to wrap the processing in a Coherence function that is ‘data aware’. Most likely an Aggretator or possibly an Entry Processor. The reasoning being this is that these functions will automatically route themselves to the nodes where the data resides.

The alternative approach is to use an invocable, but this is not data aware so you have to write extra code to route each request to the correct node (perfectly possible but not the most elegant or efficient solution).

So persisting with the data-aware functions as a wrapper for our pricing algorithm, lets say an Aggregator, you would quickly hit a problem with the way that Coherence is architected internally. Aggregators run inside the Cache Service (i.e. the service that manages data in Coherence) and the Cache Service threading model does not permit re-enterant calls [1].

So what does that mean? It means that, if you ran your Aggregator against the trades cache, you would not be able to call out from that Aggregator into the Market Data cache to get the data you require to price the trade. Such a call would ultimately cause a deadlock.

The  coherence-threadingdiagram demonstrates the CacheService threading model under a simulated deadlock. Even when the Cache service is configured with a thread pool there is the possibility that a re-entrant call will be scheduled back to the worker thread that is making that call, particularly in the case where the thread pool is small and the EntryProcessor workload is long.

A work around for this problem is to place the parent cache (or more precisely, the cache against which the Entry Processor or Aggregator is run) in a different Cache Service to the cache that the function is operating on. By splitting into at least two Cache Services the call to the ‘other’ cache will enter via a different Main thread to which invoked the Aggregator that you are currently running. This removes the possibility of deadlock.

InvocableHowever, for our use case, spitting the market data cache and trades cache into different cache services is not an option as it breaks Affinity. The data items will no longer collocate (as affinity is based on the hashing algorithm Coherence uses to store data, and that algorithm is at a cache service level).

So how do you solve this problem. Well you have two options.

  1. You sidestep the threading model by accessing the backing map directly. This way to can access the data in the market data cache using the thread you are on (without Coherence re-queuing it). The problem with this method is that it is a back door and that leaves you open to potential problems (could there be a time when you expect the item to be in the local JVM but it is not?)
  2. As mentioned earlier you wrap your request in an invocable (which does not have the same threading issues as it runs in the Invocation Service) and route it to the correct machine yourself. This is described in the final diagram.

As to which is best to do. Well I guess that depends how risk averse you are ;-) but for what it’s worth I use the former.

[1] http://coherence.oracle.com/display/COH35UG/Constraints+on+Re-entrant+Calls

Coherence Part IV: Merging Data And Processing

Saturday, August 29th, 2009

A lot of people start using Oracle Coherence as a distributed cache because they need to get away from a data-bottlenecking problem. Many of open source NoSQL stores will help you with this problem too (if all you need is to stream large data volumes, being solely in memory is unlikely to be worth the additional hardware cost). However there are some big advantages to being entirely in memory. Distributed execution occurs next to the data it needs to operate on, either on request or as a result of some state change (think trigger), and this is a very powerful tool. This can lead to one of those ‘penny-dropping moment’ as the potential of merging data and processing, particularly in a wholly in-memory architecture, begins to unfold.

The benefits or moving computation to data have been around for a very long time – stored procedures being the classic example. The possibilities are extended significantly when the processing space is actually a distributed data grid, with all logic executing in the same language (in this case Java) and with data represented hierarchically (as objects) rather relationally. Suddenly a whole world of fast distributed processing on collocated data opens up.

Interestingly this is one of the main drivers for MapReduce (e.g. Hadoop): deal with very large data sets in a simple (albeit somewhat brute-force) way, collocating data and processing (although in Hadoop’s case it’s disk based) to allow processing to scale to peta- or exabytes. This same pattern can be applied in Coherence but with a slightly different as the goal: extending your application tier to allow real time processing in virtual address space that can grow to terabytes.

There are a couple of points worthy of note before we go on:

  • The process of Merging Data and Processing is not seamless. The details of this are covered in another post. This article is meant solely as an introduction.
  • There are in fact databases with exactly the same benefits, with respect to merging data and processing. VoltDB is closest (solely in memory, Java stored procedures) but there are many other shared nothing DBs that have impressive performance. Exasol and Paraccel are two worthy of note.

Data Affinity: Ensuring Collocation of Disparate Data Sets

Data affinity allows associations to be set up between data in different caches so that the associated data objects in the two different caches are collocated on the same machine. In the example here trade data and market data are linked via the ticker meaning that all trades for ticker ATT will be stored on the same machine as the ATT market data.


Using Coherence to Run Processing in the Grid

Thus when an Entry Processor or Aggregator executes, say to run a trade pricing routine, it can access the trade and its market data without having to make a wire call as the market data for that particular trade will be held on the same machine (whenever possible).


This presents the possibility of folding the classic service-centric approach in two[1]. Suddenly compute architectures can be merged into one layer that has responsibility for compute and data.  The fundamental advantage being that far less data needs to be transmitted across the wire.

Increased Wire Efficiency

In a standard architecture (the upper example) data is retrieved from a data source and sent to the application tier for processing. However in the Coherence Application-Centric approach (the lower example) the code is sent to the machine that holds the data for execution. This is one of the real penny-dropping concepts that can revolutionise a systems performance.

But it is important to note that Coherence is not a direct substitute for a compute grid such as DataSynapse. Application-Centric Coherence involves leveraging in the inherent distribution Coherence provides as well as its inherent collocation of processing and data.

sending code or data

Thus looking at the anatomy of a simple Application-Centric deployment we see:

  • A feed server enters a trade into the Trade cache using an Entry Processor to execute some pre-processing.
  • This in turn fires a CacheStore which reliably executes some domain processing for that trade on the same machine.
  • The domain processing results in the trade being updated in the cache.

This is just one sample pattern, there are many others. Simply using Aggregators (thing MapReduce) distribute work to collocated data on the grid is a powerful pattern in it’s own right.  All these patterns share the ability to collocate domain processing in a Java across a large, distributed address space. This means that not only is the execution collocated with the data but the executions are implicitly load balanced across the Coherence cluster.


So Coherence has evolved from being a data repository to an application container which provides:

  • Distribution of processing across multiple machines
  • Fault tolerance of data and processing (including async)
  • Scalability to potentially thousands of nodes
  • The ability to collocate data and processing.

An enticing proposition!!!

[1] Service-Centric and Application-Centric are terms coined by Lewis Foti to describe the two broad architectural styles used to build Coherence based systems. Service-Centric architectures use Coherence simply as a data repository. Application-Centric users use Coherence as a framework for building event based distributed systems. Such systems leverage the inherent distribution and fault tolerance that comes with the product with operations being generally collocated with the data they require. This merges the Application and Data layers of the system.

See also:

Coherence Part III: The Coherence Toolbox

Sunday, July 19th, 2009

Coherence is so much more than a hash map. In this article we’ll introduce some of the main functions that a programmer has in their Coherence Toolbox. These include:

  • CQC
  • Near Caching
  • Expiry
  • Entry Processors
  • Triggers
  • Synchronous and Asynchronous Cache Stores

[Edit: There are running code samples that go well with this post in the coherence-bootstrap on github]

Aggregation: Coherence’s MapReduce

For operations that act on data that exists on multiple machines Coherence will parallelise the execution. The example shown here is a summation of “quantities” across a particular cache. Each machine in the cluster performs the summation for their portion of the data. The result of each of these is passed back to the serving node which performs the final summation and returns the final answer to the client. This is analogous to Google’s MapReduce patternparalell

Near Caching: Where the Real Caching is at

All client processes can configure a near cache that sits “in process”. This cache provides an in-process repository of values recently requested. Coherence takes responsibility for keeping the data in each near cache coherent.

  • Thus in the example shown here Client A requests key1 from the cluster. This is returned and the key-value pair are stored in the client’s in-process near cache.
  • Next Client B writes a new value to key1 from a different process. Coherence messages all other clients that have the value near cached notifying them that the value for key1 has changed. Note that this is dynamic invalidation, the new value is not passed in the message.
  • Should Client A make a subsequent request for key1 this will fall through to the server to retrieve the latest value.

Thus Near Caching is a great way to store data which may be needed again by a client process.


Continuous Query: Pub-Sub Queries at our Fingertips

In contrast to near caching, should a client application be interested in all updates made to a certain data set, a Continuous Query would be preferable. Continuous queries are used to define a query that will be proactively kept up to date by the cluster as data within it changes. Looking at the example:

  • Client A initialises a ContinuousQueryCache using a filter that defines a subset of the entries in the cache, in this case all trades with the ticker “ATT”. The resulting dataset is held locally in the client’s process.
  • Next Client B writes a value which is included in Client A’s continuous query. The cluster runs the continuous query filter against Client B’s write (if it is new) and determines its relevance to Client A’s continuous query. It messages both the key and value back to Client A, updating its locally cached copy with the new value.

Thus Continuous queries provide a proactively updated in-process data set to clients. Typical usages include:

  • A trading blotter containing trades for a certain trader/book.
  • Ticking prices for a certain Currency.


Thus in summary near caches receive invalidations only, with subsequent requests falling through to the server to get the changed data. Conversely continuous queries receive updates containing all new and changed data. So when might you use each of these? Use near caches by default for cases where there is likely to be reuse. Use continuous queries when it is known that all changes to a certain data set will be relevant to clients.

Expiry: Making Sure You Don’t Run Out of Memory

The cache types, Partitioned, Replicated and Near all support expiration policies for removing entries automatically from the cache. There are a set of basic expiration policies such as Most Recently Used, Least Frequently Used etc. Custom expiration policies, written in Java, can also be defined.

In the example here a client has a near cache configured to keep the most recent 1000 tuples. The partitioned backing cache on the server has a different expiration policy set that expires entries once they reach a certain age.


Indexes: Not So Much About Lookup Speed As Avoiding Deserialisation.

Coherence allows the addition of indexes to speed up access to objects via attributes other than the key of the HashMap. In the example here the Trade cache, which is keyed by Trade ID has an additional index added to the counterparties method on the trade object. Also note that, in this case, the counterparties method returns multiple values so the resulting index contains more entries than the cache itself.

Accessing data via its key actually turns out to be several times faster than accessing it via an index. The reason for this is two fold:

  • Queries performed against a key can be directed straight to the node that the key is stored on via the well known hashing algorithm. Index queries however must be sent to all nodes. Although this is done in parallel the transaction must wait for the all responses.
  • Keys are unique. Indexes in Coherence solve the general case (any cardinality) and as such are less efficient.

However the real boon of Coherence indexes is that in creating an index Coherence deserialises the object and caches the deserialised index key. Thus when computing the query each object does not need to be deserialised to look for a match.

Entry Processors: Avoiding Locking

Locking keys directly is supported in Coherence, but it is expensive. In the example here a client locks a key, performs an action and then unlocks it again. This takes a scary 12 network hops to complete. Fortunately, there is a better way…


Entry processors solve this distributed locking problem by executing a predefined piece of code, on the server, against a certain key. They represent one of the four primary constructs that Coherence offers and have the following properties:

  • They execute on the machine that the key is located on.
  • They execute synchronously with respect to that key (i.e. the key is write-locked during the execution of the Entry Processor).
  • They code they run has full access to the key and entry.


In this example the client invokes an Entry Processor against a  specific key in the cache.

  • A serialised version of the entry processor is passed from the client to the cluster.
  • The cluster locks the key and executes the passed Entry Processor code. The Entry Processor performs the set of actions defined in the process() method.
  • The cluster unlocks the key.

Thus an arbitrary piece of code is run against a key on the server.

Here we see an example of an entry processor, the ValueChangingEntryProcessor which updates the value associated with a certain key. Note that in contrast to the locking example described on a previous slide, this execution involves only 4 rather than 12 network hops.



class ValueChangingEntryProcessor extends AbstractProcessor {

   private String newValue;

   public ValueChangingEntryProcessor(String newValue) {

      this.newValue = newValue;


   public Object process(InvocableMap.Entry entry) {


      return "The value has been set to " + newValue;



Invocables: Making Yourself a Little Compute Grid

Invocables are the second of the four primary constructs and are analogous to a DataSynapse grid task in that they allow an arbitrary piece of code to be run on the server. Invocables are similar to Entry Processors except that they are not associated with any particular key. As such they can be defined to run against a single machine or across the whole cluster.

In the example here an Invocable is used to invoke a garbage collection on all nodes on the cluster. Other good examples of the use of Invocables are the bulk loading of data, with Invocables being used to parallelise the execution across the available machines. invocables

Server Side Eventing I: Triggers

Triggers are the third of the four primary constructs and are analogous to triggers in a database. In the example here the client writes a tuple to the cache and in response to this event a Trigger fires, executing some user defined code. The code is executed synchronously, that is to say that the key is locked for the duration of the execution.

Server Side Eventing I: Cache Stores

The last of the four primary constructs is the CacheStore. CacheStores are usually used to persist data to a database and contain built in retry logic should an exception be thrown during their execution.

Looking at the example here:

  • The client writes a tuple to the cache.
  • This event causes a CacheStore to fire in an attempt to persist the tuple. Note that this may be executed synchronously or asynchronously.
  • In this case the user defined code in the CacheStore throws an throws an exception.
  • The CacheStore catches the exception and adds the store event to a retry queue.
  • A defined period of time later the cache store is called again. This time the execution succeeds and the tuple is written to the database.

The retry queue is fault tolerant. So long as the cluster is up it will continue to retry store events until they succeed.

Should multiple values be received for the same key during the write delay of an asynchronous CacheStore the values will be coalesced, that is to say that only the most recent tuple will be persisted. This coalescing also applies to the retry queue.


Your Coherence Toolbox

Thus, to summarise the four primary constructs:

  • Both Entry Processors and Invocables are called from the client but run on the server. They both except parameters during construction and can return values after their execution.
  • Triggers/BackingMapListeners and CacheStores both run on the cluster in response to cache events.
  • Triggers/BackingMapListeners, like Entry Processors, lock on the key for which they are executing. Synchronous cache stores also lock but their use in asynchronous mode tends to be more common.
  • Cache stores are guaranteed, in that they will retry should execution fail and this retry logic is fault tolerant (it will retry on a different machine should the one it is running on fail). They also coalesce changes.


See also:

Coherence Part II: Delving a Little Deeper

Saturday, May 16th, 2009

Coherence: A Shared Nothing Architecture

Although Oracle Coherence may have a simple interface, behind it lies a some pretty cool tech. The heart of Coherence’s primary storage unit, the distributed cache, is it’s data partitioning algorithm. This is analogous to Horizontal Partitioning or Sharding in database terminology. Vertical partitioning is the corollary of Horizontal partitioning, where database tables are split, by columns, into different tables (this process being called Normalisation). In Horizontal Partitioning tables are broken up into sets of rows through a partitioning algorithm, usually defined by the user. This the the fundamental concept behind any partitioned database (such as RAC or Terradata).

Unlike some simple clustered data repositories, which rely on copies of the dataset being held on each machine, Coherence spreads its data across the cluster using a Horizontal Partitioning strategy based on a hash its key. Thus each machine is responsible for its own portion of the data set.

Thus, in the example seen below, the user requests the key “2” from the cache (note that a cache is analogous to a table in a database, it is single HashMap instance). The query for key “2” is directed to the single machine on which the data resides. In this case the node in the top left corner.

A subsequent request for key “334” is routed to the machine in the bottom left corner as it is this machine which is responsible for that key.


Although the main storage mode is the partitioned cache, where the data is distributed across all machines in the cluster. It also supports the simpler case of the replicated cache, where each node has its own copy of the entire data set.

So when do you think a replicated cache might be the appropriate choice?

Well, the advantage of a replicated cache is that the data will always be held in-process. The downside is that writes to it must be sent to all machines and such actions are slow and arduous. Thus in general:

  • Use a partitioned cache for general data storage.
  • Use a replicated cache for fairly low volume, static data that needs to be used “in process” on the server.

The advantages of in-process data on the server will become apparent later on when we consider running code on the Coherence cluster itself. When performing such server-side executions having access to in process data (rather than having to make a wire call) becomes invaluable. More on this later…

How it Works: Reading and Writing

Lets look at what happens during a simple data retrieval operation. Here the client invokes a “get” operation to retrieve the value for the key:Key1. The request is directed to a connection proxy on the server. This manages the connection as well as forwarding the request on to the machine which it knows contains the key: Key1. It does this via the “Well known hashing algorithm”.retrieving data

The Well Known Hashing Algorithm is the algorithm used to determine on which machine each hash bucket will be stored. This algorithm is distributed to all members of the cluster, hence “well known”. This has the effect that the location of all keys are known to all nodes.

well known hashing

Now looking at writing data to the cluster, the format is similar to gets with the put travelling through a connection proxy which locates the data to be written and forwards on the write. The difference is that writes must also be written to the backup copy which will exist on a different machine. This adds two extra network hops to the transaction.


How it works: Communication Protocols

Coherence uses different protocols to communicate between different services.

  • Client connect to via a TCP/IP based protocol called TCP*Extend.
  • Cluster members use multicast based messaging to discover new cluster members and to heartbeat.
  • Cluster members use a custom protocol, built on top of UDP for reliable communication within the cluster. As a result the protocol management usually performed in layers above the Transport layer of the network stack – most notably packet ordering and reliable delivery – are managed by Coherence itself in the Java cache-server process.

Coherence includes a clever mechanism for detecting and responding to node failure. In the example given here node X suffers a critical failure due to say a network outage or machine failure. The surrounding cluster members broadcast alerts stating that they have not heard from Node X for some period of time. If several nodes raise alerts about the same machine a group decision is made to orphan the lost node from the cluster.

Once Node X has been removed from the cluster the backup of its data, seen here on the node to its left, is instantly promoted to being a Primary store. This is quickly followed by the redistribution of data around the cluster to fully backup all data and to ensure there is an even distribution across the cluster. The redistribution step is throttled to ensure it does not swamp cluster communication. However this step completes more quickly on larger clusters where less data must be redistributed to each node.


Coherence has a propiatary object serialisation and communication protocal called PIF/POF standing for Portable Invocation Format and Portable Object Format respectively. POF is particuarly important as apart from being highly compressed (when compared to Java serialisation) it allows deserialisation into C++ and .NET Coherence clients. There is a detailed post on the internals of POF here.

In the example the C# client defines the POF serialisation routine which is executed by the IPofSerialiser (written in C#) to create a POF object which is stored in the cluster. When a Java client requests the same object it is inflated with the PofSerialiser (written in Java) to create a comparable Java object.


The previous slide covered the marshalling of data from one language to another. However non-Java clients also need to execute code on the cluster and, as the cluster is written in Java, any executions run there must also be in Java. To solve this problem server side code, such as the Invocable shown here, is mapped from a C# implementation on the client to a Java implementation on the server. Thus calling MyInvovable in C# will result in the Java version of MyInvocable being run on the server with the objects it uses being marshalled from one language to another via POF (as described in the previous slide).


Client Types

There are two types of client in Coherence:

  • Extend Client: Connects to the cluster via TCP*Extend which is a protocol based on TCP-IP. This is the typical means for connecting to the cluster, is lightweight and scalable.
  • Compute Clients: These are cluster members running in a data-disabled mode. They are heavier processes needing tens of seconds to initialise as part of the cluster. However they are faster as they know the location of data (via the well known hashing algorithm).


Monitoring: Boring but Necessary

Monitoring of Coherence is done via inspection of the MBeans it publishes over JMX. JConsole tends to the be tool used to do this although there are a variety of other alternatives including RTView which presents a much richer interface onto the information Coherence produces.

The Coherence JMX implementation includes Mbean publication from each cluster member which is collated via a singe nominated JMX Collector node. The JMX Collector makes all MBeans available to users via JMX.


See also:

Coherence Part I: An Introduction

Wednesday, March 4th, 2009


You can think of Coherence as simply being a distributed cache. It is after all what it was designed to do. But doing so would be something of an injustice. If a caching layer is all you need there are probably cheaper options. What you get with Coherence is a well thought out, simple framework for dealing with distributed data.

In one dimension it has moved towards the traditional  database space, offering query functionality, indexing etc. In another it has encroached on the world of the application container by providing a framework for low latency, highly available, distributed systems in Java. It is its evolution into both of these, traditionally disparate, technology spaces that make it such a unique and useful product to use.

Coherence is still a traditional distributed cache under the covers, and is a pretty good one at that. So if you simply require fast access to prefabricated data (that is to say data that has been pre-processed into the required form), and you work in one of the 3 main languages (particularly Java), Coherence is still likely to be a decent choice, but there are quite a few cheaper alternatives these days, so bear that in mind.

It’s also important to understand the limits of the technology and Coherence certainly has its limits (for example). A large proportion of Coherence’s performance and scalability gains come from it’s adoption of a shared nothing architecture (I’ve written more on shared nothing architectures here). This means it excels in certain situations and quite the opposite in others. Learning to use the technology is about learning its limits. It should be one of the many tools in your architectural toolbox, but a fantastic tool to have.

Coherence is laid out over three distinct layers; client, cluster, persistence (see opening figure). The Coherence cluster itself is sandwiched between the client on the left and the persistent data source on the right. The client has it’s own, in process, 2nd level cache. The persistent data source is usually only used for data writes, it does not contribute to data retrieval (as the cluster, in the centre of the diagram, will typically be pre-populated with data, but more on that later).


Coherence has three major things going for it; it is fast, fault tolerant and scalable. Lets look at each of these in turn…

Coherence is Fast

Coherence’s speed can be attributed to five major attributes of it’s design:

  1. It stores all data solely in memory. There is no need to go to disk.
  2. Objects are always held in their serialised form (using an efficient binary encoding named POF – find out more about this here). Holding data in a serialised form allows Coherence to skip the serialisation step on the server meaning that data requests only have one serialisation hit, occurring when they are deserialised on the client after a response. Note that both keys and values are held in their serialised form (and in fact the hash code has to be cached as a result of this).
  3. Writes to the database are usually performed asynchronously (this is configurable). Asynchronous persistence of data is desirable as it means Coherence does not have to wait for disk access on a potentially bottlenecked resource. As we’ll see later it also does some clever stuff to batch writes to persistent stores to make them more efficient. The result of asynchronous database access is that writes to the Coherence cluster are fast and will stay fast as the cluster scales. The downside being that data could be lost should a critical failure occur. As a result you should only use this asynchronous behaviour for data you don’t mind loosing.
  4. Queries use indexes which are sharded across the data grid. Thus queries follow a divide and conquer approach.
  5. Coherence includes a second level cache that sits in process on the client. This is a analogous to a typical caching layer, holding an in-process copy. This copy can be kept coherent either via setting a near-cache to be ‘present’ or via using a ‘continuous query’fast

Coherence is Fault Tolerant

Coherence is both fault tolerant and highly available. That is to say that the loss of a single machine will not significantly impact the operation of the cluster. The reason for this resilience is that loss of a single node will result in a seamless failover to a backup copy held elsewhere in the cluster. All operations that were running on the node when it went down will also be re-executed elsewhere.

It is worth emphasizing that this is one of the most powerful features of the product. Coherence will efficiently detect node loss and deal with it. It also deals with the addition of new nodes in the same seamless manor.


Coherence is Scalable

Coherence holds data on only one machine (two if you include the backup). Thus adding new machines to the cluster increases the storage capacity by a factor of 1/n, where n is the number of nodes. CPU and bandwidth capacity will obviously be increased too as machines are added. This allows the cluster to scale linearly through the simple addition of commodity hardware. There is no need to buy bigger an bigger boxes. It should be noted that scalability only comes with key-based access. As noted previously (here) queries will not scale linearly as you increase the number of nodes.

So we can summarise why Coherence is faster than traditional data repositories.

  • Coherence works to a simpler contract. It is efficscalable-chartient only for simple data access. As such it can do this one job quickly and scalably.
  • Databases are constrained by the wealth of features they must implement. Most notably (from a latency perspective) ACID.
  • High performance users are often happy to sacrifice ACID transactions for speed and scalability.

So What Is Coherence Really?

Most importantly, Coherence is just a map. All data is stored as key value pairs. It offers ‘some’ functionality that goes beyond this but it is still the fundamental structure of the product and hash based access to the key/value pairs it contains is fundamental to the way it works at the lowest level.


In a typical installation Coherence will be prepopulated with data so that the cluster become the primary data source rather than just a caching layer sitting above it (Coherence offers both modes of operation, it just so happens that almost everyone I know does it this way). The main reason that ‘read through’ is not often used is that (i) it adds latency to early client transactions and (ii) the map contains in indeterminate quantity of data meaning that searches (queries) against the cache will return indeterminate results.

not read through

Coherence is not a database. It is a much lighter-weight product designed for fast data retrieval operations. Databases provide a variety of additional functionality which Coherence does not support including ACID (Atomic, Consistent, Isolated and Durable), the joining of data in different caches (or tables) and all the features of the SQL language.

Coherence is not a Database

Coherence does however support an object based query language which is not dissimilar to SQL. There is now even an SQL-like declarative language you can use too. However Coherence is not suited to complex data operations or long transactions. It is designed for fast data access via lookups based on simple attributes e.g. retrieving a trade by its trade ID, writing a new trade, retrieving trades in a date range etc as well as executing data-centric custom functions (more to come on this later)

not a db

Coherence does not support:

  • Transactions (ACID)*
  • Joins
  • SQL**

* There is now (as of 3.6 I think) support for transactional caches. I’ve not used them to be honest and they have a number of restrictions. If you need transactions though you should probably look at alternative technologies.

** Coherence does support a simpler, object based query language but it is important to note that coherence does not lend itself to certain types of query, in particular large joins across multiple fact tables.  There is now a newer declarative language option too.

Comparing Coherence with Other High Performance Data Repositories

Now lets compare Coherence with some other prominent products in the Oracle suite. Firstly lets look at the relationship with Oracle RAC (Real Application Cluster).

RAC is a clustered database technology. Being clustering it, like Coherence, is fault tolerant and highly available – that is to say that loss of a single machine will not significantly effect the running of the application. However, unlike Coherence, RAC is durable to almost any failure as data is persisted to (potentially several different) disks. However Coherence’s lack of disk access makes it significantly faster and thus the choice for many highly performant applications. Finally RAC supports SQL and thus can handle complex data processing. RAC however is limited by the fact that it is a Shared Disk Architecture, whereas Coherence is Shared Nothing (This difference is beyond the scope of this article but is discussed in full here).

racTimesTen is a totally different Oracle technology. It is a completely in-memory implementation of an Oracle database supporting most standard database functionality, but at much lower latency.

The support for in memory storage is clearly a feature of both TimesTen and Coherence thus making them both suitable for low latency applications.

However the big advantage of using Coherence is that it is distributed i.e. the data is spread across multiple machines. TimesTen is restricted to a single process and thus is neither highly available nor scalable beyond the confines of a single machine (although it can be configured for fault tolerance).

However TimesTen offers most of the support that a database offers including:

  • Transactions
  • Complex query language (SQL) joins etc
  • Heavily optimised query execution.

This makes it the obvious choice if complex data processing is required or there is an existing dependence on SQL.

x10The other comparable technological space is the Shared Nothing database. These are databases that share the same architectural style where each node has sole ownership of the data it holds. Such systems are currently used for a rather different use case; data warehousing as apposed to OLTP applications. However this is likely to change in the near future. You can find more discussion of Shared Nothing databases here. My SNDB of choice is ParAccel.

Finally Coherence there are a number of other competitors out there which are pretty good. If you’re reading this today (I’m updating this in 2013) you should be checking out some of the open source alternatives. Hazlecast is the most obvious which now has a mature and well funded project that plays in the same product space. Gemfire, Terracotta and Gigaspaces are the direct competitors. If you are just looking for scalable caching layers with query semantics you might be better looking at a NoSQL disk based solution. These are much cheaper to run in the long term and keeping all your data in memory is often overkill if you are not operating on it directly. Check out MongoDB and Couchbase which are the two NoSQLs most closely related and both open source.

See also: