Archive for the ‘Coherence Patterns’ Category

Cluster Time and Consistent Snapshotting

Wednesday, May 9th, 2012

Banks, in particular, often need snapshotting where a snapshot represents an immutable set of data. Ultimately this is a set of versioned keys that describe some set of data that can be requested again and again and will always be the same.

One approach to snapshotting is to simply copy a set of versioned keys somewhere to represent the snapshot. This works for small datasets but it is quite limited as an approach. A better approach is to use time to snapshot your data and this is the mechanism used by databases that implement snapshot isolation as a concurrency control mechanism.

This is the approach used in bi-temporal databases using the concept of tuple-versioning.

Let us first assume you have implemented versioning of your data so that it is immutable. Next the versioned objects can be augmented with two times that represent system times when the version arrived and when it was replaced (if you do the same with business time you will have a truly bi-temporal store):

public interface MyBusinessObject{
   public Date arrivedAt();
   public Date supersededAt();
}

arrivedAt() corresponds to when the time is written, supersededAt() is the time when the object version is replaced (or null/infinite if it is the most recent object).

This allows queries to be written that isolate a snapshot the system at a point in time.

select * from system where arrivedAt > desiredTime and supercededAt <= desiredTime

This is a fairly simple and widely implemented concept.

The problem with this approach is that you need an accurate and consistent implementation of time in a distributed environment where system clocks vary

In Coherence and other distributed systems you do not always have a consistent time at your fingertips. As with most approaches there is an easy way and a harder way.

The Easy (trivial) Way

Keep it simple and channel all writes through a single process. Use this single System Clock on the writer process to source time. If the process restarts it needs to check the last written time to ensure time doesn’t go backwards (as it can with NTP or simple time variance across machines).

The Harder Way

This approach is better if you want to embrace distribution and not write all your data through a single process. This is a good idea for any system that supports heavy write workloads.

The problem with applying tuple versioning is that you need an accurate concept of cluster time. A time that is consistant accross all nodes and does not go backwards. Coherence already has a cluster time concept that does not go backwards, but it is not sufficient for this case as it will not guarentee that writes occur at the same temporal point (that is to say there can be variation across the nodes none the less).

Implementing cluster time for the purpose of snapshotting in a truly distributed environment requires a trick. The trick is to view the system in epochs, where each epoch will be consisten. In addition we use two concepts of time: write-time and read-time where the condition holds: write-time is always greater than read-time across the cluster.

Trick: Have two times: write and read time. Write-time is always greater than read-time across the cluster so that, at any read-time-snapshot, all writes for that snapshot will have been completed.

Using this trick you can guarantee that reads done using read-time will be repeatable and consistent at any node in the cluster.

To understand how this works we’re going to introduce the concept of a cluster-clock. The clock ticks along setting write-time and read-time on all nodes. The condition it will uphold is that at any read time all writes for that time will have been completed.

The implementation of this cluster-clock takes a little thinking about. Firstly you need a singleton service for the clock to live in. There will only be one clock running in the cluster and this needs to be fault tolerant. The singleton-service pattern for doing this is described here.

Next the clock needs to ensure our above condition that read-time always supersedes all writes for that time. Do this by iterating over all nodes in the cluster in turn:

  1. Iterate over all nodes in the cluster setting write time = T
  2. Iterate over all nodes in the cluster setting read time T-1 (as we now know no nodes will be writing with T-1)
  3. Iterate over all nodes in the cluster setting write time = T+1
  4. Iterate over all nodes in the cluster setting read time = T (as we now know no nodes will be writing with T+1)
  5. continue to loop….

This is described in the figure. We originally attempted this this using replicated caches to hold the times but unfortunately this does not work. Replicated caches do not behave quite as you might think. They do not wait for data to be synchronised with all nodes.

The approach we use now is an invocable that is synchronously broadcast to the cluster. The invocable performs the ticks on each node.

As writes enter the system object version numbers along with arrivedAt/supercededAt data is written through triggers. The triggers access the current write-time on the data node and use this to set arrivedAt() and supercededAt() appropriately.

Snapshot queries then work by querying objects where arrivedAt() >  readTime and supercededAt() <= readTime.

The read-time corresponds to a snapshot. To take a snapshot of the system, you simply ask it what is the current read-time. In our case the readtime is taken from the extend proxy the client connects to.

Queries using this snapshot of read-time in this manner are guaranteed to be repeatable.

Finally there can be multiple versions returned by this mechanism which must be removed. The cluster time is in fact more like an epoch so multiple versions can exist in each ‘tick’. In ODC we standardise the ticks of the clock to be every second (they will be at least one second, possibly more particularly if nodes are doing garbage collection).

This means you snapshot queries need to take into account that they may return multiple objects that match the read-time epoch. This is simply a case of only selecting the object with the highest version (assuming you are implementing versioning on your objects using triggers as described here). We do this as part of the query by using an aggregator.


GUI Sorting and Pagination with Chained CQCs

Sunday, April 15th, 2012

This is a simple trick, useful for cleanly implementing GUIs where the memory availability varies across different layers.

A common pattern is for there to be a Coherence Cache that contains a data and a GUI will require some subset of that data. Individual screens further require filtration.

Chaining CQCs is a simple and elegant way of doing this as it lets you decide, through config, where each CQC will reside.

Taking the rich client example you might request all trades for a desk be sent to the client (this analogy works equally well for a webserver process). This can be implemented as a CQC selecting all trades in a desk.

A trader then applies a subsequent filter to his blotter to restrict it to only three books. If you implement this as a further CQC, run on the current CQC it will apply the filtering for you (the objects will not be duplicated but the matching keys will).

The nice thing about this pattern is that where you place each CQC (in the client/webserver or on the cache itself) is up to you and you can thus easily tailor it according to the memory availability of each layer /latency requirements. Moving them around is just config.

Credit to Damian Guy / Jon Knight for coming up with this neat idea.



The Collections Cache

Monday, November 7th, 2011

This is a very simple pattern that can be used to solve a variety of problems. The structure uses a “Collections Cache”: a cache that appends values to a collection using a Trigger. You can then access the entire collection using a get() or alternatively use an EntryProcessor to extract a certain value from the collection.

The pattern is used on ODC to track aggregate views. Say you want a materialised view of trades grouped by book.  We keep a reverse index of trade references for each book. The view is updated asynchronously as data is added to the cache using an async CacheStore.  The pattern is applicable to a variety of other use cases, one being an approach to managing version history.

One downside of this pattern is that if the concept is not well known it can be confusing, after all the object you put() is not the same type as is returned from a get().  Simple naming as collections* can help avoid this confusion though.


Singleton Service

Saturday, November 5th, 2011

Being a data grid, Coherence is very good at doing things in a distributed way across all nodes in the cluster. However it doesn’t offer any functionality (currently) for running a service just the once, in a reliable manner. Most applications solve this problem by simply running another process, for example you might start a second process that reads data off some queue and keeps your cluster up to date. It’d be nice however if you could leverage Coherence’s fault tolerance to ensure that, if the cluster was running, your QueueListener was always running too. In fact this is fairly simple to do and can be used for a host of common applications including loading data, keeping it up to date, adding indexes and regulating a cluster wide time stamp (article to follow).

What we want is a service that will always run on one of our Coherence nodes no matter what happens to the cluster.

This solution is conceptually simple. You have lots of processes in your cluster. When each node starts it simply checks whether the service has already been started elsewhere by attempting to lock a fictitious, well-known key:

lockCache.lock(“SingletonLockKey”);

Only one of the processes in the cluster will attain the lock. If it does attain it then it starts the Singleton Service, adds indexes, loads data or whatever. Simple. If the node running the service dies then the lock is released and another process will acquire it and start the singleton service there.

//Run in a new thread on a wrapped DefaultCacheServer i.e. should run on every node
int blockUntilLockAquired = -1;
lockCache.lock(“SingetonLockKey”);
while(true){
   boolean locked = lockCache.lock("singletonLockKey", blockUntilLockAquired);
   if(locked){
      //start singletons here
      wait();
   }
}

Reliable version of putAll()

Friday, November 4th, 2011

I like triggers in Coherence. They allow us to do lots of cool stuff to our objects as we add them to the cache. Implement versioning, stamp them with cluster time, save them to a messaging system, check for duplicate writes, check for concurrent writes … the list goes on. But with all this processing comes the risk of failure and Coherence provides little in the way of exception reporting. In fact it provides no information on the individual failures, something that quickly becomes a problem as the level of trigger functionality increases. On ODC this caused us a real problem so we re-implemented putAll() so that it correctly reported those writes that failed. Credit goes to Jonathan Knight and Andrew Wilson for working this implementation through.

The pattern is pretty simple at a high level. It involves two Invocables. The first simply executes on the extend proxy, as we need to be inside the cluster to get access to the key assignment strategy. The next step is to split the data being written into the subsets applicable to each node using getKeyOwner(). These subsets are then sent, via a second Invocable, to the members that own them and EntryProcessors are used to do the write to the backing map directly (although this is no longer needed in 3.7). This is shown pictorially below.

You can view the code for doing this in the coherence-bootstrap project on Github: PutAllThatReportsIndividualExceptions.java

[Edit Jan '12] My colleague Jon ‘The Gridman’ Knight has done a detailed and methodical post drilling into how to implement this pattern in Coherence]


An Overview of some of the best Coherence Patterns

Friday, November 4th, 2011

You can view the PDF version here

Coherence Implementation Patterns – Sig Nov 2011 from Ben Stopford

Latest-Versioned/Marker Patterns and MVCC

Wednesday, October 19th, 2011

Getting the basics right is obviously important. If you’re moving beyond what Andrew Wilson would call get-put man then you should be thinking about versioning your objects. That means making your data immutable. Doing this has a number of benefits:

  1. Versioning provides a historic record of changes.
  2. By linking versioning with the wall-clock / business times (i.e. bi-temporal) views of the system at previous points in time can be recomponsed. This is important for providing consistent views over your data.
  3. Versioning allows concurrency to be managed through Multi-Version Concurrency Control (MVCC)

Implementing Versioning

However simply adding versions to your objects (more precisely your object key) has the downside that you can no longer look up the value via it’s business key: you must know the business key as well as the version of the object that you want.

Key = [Business Key][Version]

In Coherence accessing objects via their key directly is far more performant than doing a query (see The Fallacy of Linear Scalability) so it is preferable to keep the latest version of the object available via its business key alone. There are two common approaches to solving this problem: The Latest/Versioned pattern and the Latest Version Marker pattern.

Approach 1: Latest and Versioned Caches

The first approach is to define two caches for every object. The Latest… cache and the Versioned… cache. The key of the ‘latest’ cache is simply the business key:

Latest Cache Key = [Business Key]

This cache only ever contains the latest object. The ‘versioned’ cache contains all versions of the object with a, usually monotonically incrementing version embedded in the key:

Versioned Cache Key = [Business Key][Version]

Writes must be directed at the ‘Latest’ cache and a Coherence Trigger is used to copy the object reference to the ‘Versioned’ cache adding the version onto the key as it does so. This is demonstrated in the first figure opposite.

The disadvantage of this approach is a memory inefficiency arising because the  latest object exists in both Latest and Versioned caches. When the object is written the same reference can be used to save space, however the backup copies in each cache will be different instances and, should a node be lost, and  process of recreating the primary from the backup copy will create new instances by default further eating memory. It is therefore advisable to use the LatestMarker pattern below when memory is a concern. The advantage of this approach is that it reduces the number of records in the latest caches which makes filter operations faster when they operate only on ‘Latest’ data (a common use case in most applications).

Checklist:

  • Define two cache schemes based on the masks Latest* and Versioned* ensuring that they are in the same CacheService.
  • In the Latest* scheme specify a trigger to forward objects to the versioned cache, incrementing the version as it does so.
  • Specify KeyAssociation (Affinity) on the business key of the Latest* cache across both caches.
  • Write a trigger that adds a monotomically incrementing version to the business key as it copies the value’s reference to the Versioned cache. You’ll need to use direct backing map access to avoid reentrancy problems (I’ve discussed the issues of reentrancy in Coherence before. See Merging Data And Processing: Why it doesn’t “just work”). The code sample below is provided for reference.

Approach 2: Versioned Cache Only With a Latest Version Marker

A second approach to solving the same problem is to only use a single cache with the key format:

Key = [Business Key][Version]

but specifying that the latest version of an object has a special version marker:

KeyLatest = [Business Key][LatestVersionMarker]

As clients are aware of the LatestVersionMarker (for example -1 is common) they can always access the latest value directly by calling:

cache.get([businessKey][-1])

This approach does not suffer from the issues of duplication  associated with separate Latest and Versioned caches but has the disadvantage that versioned data is in the same cache as latest data, marginally slowing down filters. Just reiterating that again: in this pattern there is only one copy of the latest object. The one with the latest marker. This is different to the latest/versioned pattern where the latest object will exist in both caches (so twice) so that the versioned cache can contain all versions of that object.

Checklist:

  • Create a cache with a KeyAssociation on the business key (i.e. the key parts without the version number). Add a trigger that replaces the current value for the  “LatestMarker” with the new object whilst copying the old value to a key with the appropriate real version. You’ll need to use direct backing map access to avoid reentrancy problems (I’ve discussed the issues of reentrancy in Coherence before [link]). See code sample below.

Implementing the trigger to avoid reentrancy issues

The below code outlines one  mechanism for moving objects (in this case for the Latest/Versioned pattern) from one cache to the other using direct backing map access.

public void copyObjectToVersionedCacheAddingVersion(MapTrigger.Entry entry) {
   // I'm assuming that you are tracking the version, and incrementing it, in your object
   // Also note that it's more efficient to just take the version out rather than deserialise
   // the whole object but this way is more succinct
   MyValue value = (MyValue)entry.getValue();
   MyKey versionedKey = (MyKey)value.getKey();

   BinaryEntry binaryEntry = (BinaryEntry)entry;
   Binary binaryValue = binaryEntry.getBinaryValue();

   Map versionedCacheBackingMap = binaryEntry.getContext().getBackingMap("VersionedCacheName");
   versionedCacheBackingMap.put(toBinary(versionedKey), binaryValue);
}

If you are using Latest-Marker it’s essentially the same but with a marker key.

Latest/Versioned or Latest-Marker – which to choose?

Both patterns are good. We have use both extensively in my current project. Latest marker is probably best overall due to the aforementioned storage issues with Latest-Versioned. However if you are likely to make most use of the ‘Latest’ view, and will be scanning without the use of indexes, Latest-Versioned can offer performance benefits. It also feels simpler when you use it, as from the outside things are what they are.

These patterns are really important to use. They form the basis for many of the more advanced use cases. You need one of these to do MVCC, Snapshotting etc.

Note that affinity (Key Association) must be used to ensure that  the versioning process is entirely local to the JVM doing the write.

MVCC & Snapshotting

One of the main reasons for implementing these patterns is to allow more advanced features of MVCC and Snapshotting. MVCC is a concurrency control mechanism which is based on your objects being versioned. It is useful where two clients mutated the same version of the object and you want one to get a failure (and one write to succeed). This is very simple to implement in Coherence by including the object version in the write and have a trigger ensure that the version of the object being updated equals on the in the cache, otherwise exception.

Snapshotting is a more complex topic because it requires time so I’ve covered in a separate post here.

Related Posts

  1. Use normalisation to reduce the versioning burden through the application of Star Schemas and Connected Replication [link]
  2. Performing cross cache joins in Coherence [link]
  3. Understanding problems of reentrancy in Coherence [link]

Joins: Advanced Patterns for Data Stores

Thursday, September 22nd, 2011

If you’ve hit this page you are probably thinking about adding joins to your Coherence cache. In general this is not a good idea. If your cache is that, a cache, you can use the aggregate pattern to solve most problems. Joins are complex and do at cost at runtime so only add them if you need them. However they can be very useful in some circumstances.

Why do you need joins, why not just use aggregates?

You need joins if you can’t aggregate all of your data into one object in the cache. If your data is mastered in a separate database it is unlikely that you will need more than the aggregate pattern. You only need joins if Coherence is being used as the System of Record or entry point to the system. If you are using the cache-aside pattern it is unlikely that you will need joins.

Joins are useful under these circumstances:

  • Data arrives from different sources directly into the cache so it is desirable to versioned sources separately to accumulate individual histories and recombine at runtime.
  • Entities need to be snapshoted independently
  • Aggregates become overly weighty and a majority of requests don’t need the full aggregate so it is desirable to join at query time according to the user’s preference (there are other ways around this problem though including different aggregates as well as decomposing the aggregate on read).

Assuming you have one of the above use cases we’ll discuss how joins can be very useful.

The approach described here is the one taken on ODC at RBS. This is a project started in 2009 to build a centralised trade and reference data store in a bank. This use case is for a store, rather than a cache. In it Coherence plays the role of entry point to the system, hence joins are required to version independent histories and apply snapshotting.

Holding objects in a aggregate (denormalised) form leads to the problem of how you keep all that denormalised data consistent. Data must be duplicated and this both eats memory (and much more so if you implement versioning) as well as making it very hard to implement any kind of consistency across all those copies.

To get around this we use an approach which looks like (but is subtly different from) the  Star-Schema  approach used in data warehousing. Entities are denoted facts or dimensions where a fact is a big thing that needs to partitioned accross the grid and an dimension is a smaller thing that we can afford to replicate in our query processing layer as there are simply not that many of them. In fact we take this model a step further my tracking the arcs on our domain model and only replicating those that are ‘connected’… but more on that later (see here).

In this context: Facts are defined as big objects that require partitioning, Dimensions are smaller objects with cross-cutting keys that will be replicated.

Primer on the Coherence Implementation

In Coherence we split the application architecture into two layers: The Query Layer and the Data Layer. Dimensions are cached in Continuous Query Caches in the Query layer and the Facts are spread across the Coherence cluster. Related Facts can be joined in-process as Key Affinity is used to ensure collocation (i.e. they are partitioned with the same key and Coherence uses the key to determine which partition they should go in). Dimensions are replicated onto the Extend Proxies using CQC’s and the joins are done at the start and end of queries using dictionary lookups.

All dimensions are checked, in process, via the same CQCs used to speed up the 2-stage query. This has an unfortunate consequence: Changing a dimension will result in CQCs being updated across the cluster via one-phase commit. This presents a potential threat to atomaticity and isolation since the changing dimension will be incoherent across multiple JVMs during the one-phase commit. Fortunately query isolation in such a model is still ensured, from our perspective, by making the simplifying assumption: consistency is only implemented within the context of requests to a single extend proxy. This is a manageable assumption.

Further optimisation:

The application of what we denote the Connected-Replication Pattern further optimises the replication of Dimensions by ensuring that only Dimensions that are actively referenced from Facts are replicated – Dimensions are only replicated if they are actually connected to other parts of the domain model.

By only replicating data that is connected to the object graph we, in practice, only replicate 10% of the Dimensions we store. This is a huge advantage.

Further Background: Executing complex joins in Shared-Nothing Architectures

The context of the problem is any partitioned data store: one where data is spread across a number of machines. This approach was first suggested by Dewitt et al in the Gamma Database and popularised in the database community with the term Sharding. The Sharding approach has been extended in more recent technologies by partitioning both data and the responsibility for processing it in what is termed a Shared Nothing Architecture. In Shared Nothing each node is self-sufficient, each having autonomy over the data it holds and processes. It is this autonomy that allows data-stores following this pattern to scale linearly for some common query loads.

Shared Nothing Architectures however come at a price. The partitioning model breaks down when queries require intermediary results to be shipped between machines, particularly where those intermediary results do not form part of the final result. Examples include joins between ‘Fact’ tables (where the join keys must be moved from one machine to another), multidimensional aggregations such as multi-dimensional risk calculations (i.e. the OLAP domain) or transactional writes that span the partitioning strategy.

Fortunately, many modern use-cases, particularly in the OLTP space, have little requirement for complex joins that span large data sets. For these simpler use-cases queries can be compartmentalised on a single node via some common attribute that they all share (known as a partitioning key or Data Affinity in Oracle Coherence). For example access to data in an online banking application might group data pertaining to a certain user. By choosing the UserId as a partitioning key user-centric joins can be executed entirely a single node and hence will scale.

The counter-example is queries that require lots of joins that cross the partitioning strategy. Extending our banking example, listing the details of accounts that a user can make payments into would mean accessing data associated with a different user. As the UserID is the partitioning key, account information for different users will be located in a different partition. This typically requires key shipping: A two-stage query which first returns the users details then scans the cluster for the various account details for other users: the payees. This example is trivial but it alludes to a much larger problem when queries must include many different data items that cross partitioning (and hence machine) boundaries. It is these complex queries, those that need to join across a variety of crosscutting keys. The connected replication pattern addresses this problem.

Compromising between Aggregate and Snowflake

There are three fundamental concepts that are used to optimise distributed data storage: Replication, Partitioning and Indexing. When it comes to data placement we have just the two: Replication and Partitioning – with our aim being to remove the need for cross-partition joins.

The approach here is to split the aggregate data-model to pull out dimensions that you wish to version separately. This ends up looking a bit like Snowflake. The Snowflake-Schema is well known in the data-warehousing space but its application here is quite different as we use it to define what we replicate and what we partition.

Take an object model such as that shown in Fig. 1. The dotted line represents the division between Facts and Dimensions. The line can only exist in one place: that being the point in the object model where it converges on a single entity – the focal point of one-to-many relationships. This focal point also identifies the most precise, commonly shared key among facts. Like the regular data warehousing application this can be described intuitively as Facts being the recorded fact, whilst dimensions represent the context that give that fact meaning.

The point of joins in this context is to allow sub-aggregates to be versioned separately and recombined at runtime

As an example let us consider a typical online shopping application, an Order would be a Fact whilst the Customer with which the Order is placed is a Dimension that provides the Order with ‘context’. This pattern is particularly applicable to distributed data storage as it provides a middle-ground between 3rd normal form, which presents too many joins for practical distributed applications and full denormalisation which presents a range of consistency issues when changes affect large numbers of denormalised entities (as well as proving problematic when objects need to be versioned).

Snowflaking is important for tempering a version explosion (versioning of objects is important for MVCC, necessary in most non-trivial data stores).  By holding sub entities separately they can be versioned independently meaning that a change in a sub entity, for example cashflows in the model in Fig. 3, does not necessitate a version increase on the other related objects: Transaction, MTM and Legs in this case. The alternative to this would be to hold all the data as a single Fact but any change would necessitate a new version of the whole group. This is version explosion is prohibitive when using in-memory architectures.

So we have divided our data model into a Snowflake-Schema. However there is little novel to that. The value from doing so becomes evident when we use the two classifications of entities, Facts and Dimensions, to drive whether data is to be partitioned or replicated. In this manner we are able to balance replication and partitioning so that no distributed, sequential join operations are required: joins where the key-set for one entity must be retrieved first from one set of machines followed by a second query to get related keys to complete the join from another entity, on another machine. Referring back to the at the object model in Figs. 1/3, to query transactions for the PartyAlias “City Group” where the Product is an “FX trade” we would first request the PartyAlias IDs for “City Group” (Stage 1 query), once that returned we would query for the product id for “FX trades” (Stage 2 query) and then finally we’d query for the Transactions that matched the keys for those PartyAlias’ and Products (Stage 3 query). It is this sequential set of distributed queries that affect performance.

This problem is solved through the application of a Snowflake Schema so that Dimensions are replicated to the Query Layer that sits in front of the grid, whilst keeping the Facts partitioned in the grid itself (See Fig. 4). Queries still need to be sequential as described above, but importantly all the Dimension queries remain in process as the dimension data is replicated (there is no network call required) and hence the cost is minimised.

As an example of this Fig. 3 shows a typical query in which the ‘where’ clause specifies a Cost Centre. Sequential queries must navigate their way down the object model until they reach the lowest dimension. In this case Source Book. Because these dimensions are replicated everywhere the calls are in-process and hence will be fast. The result is a set of IDs for this ‘lowest’ Dimension. These IDs are then used to query the Facts, which are held partitioned across the cluster. A distributed call is made to the grid to retrieve Facts. Fact and Sub-Fact joins are done, in-process, in the various partitions across the grid (as we know related Facts and Sub-Facts will be collocated in the same partition).

This concept is not novel, the commercial databases Vertica, Greenplumb and other data warehousing products all make use of replicated data. However applying this pattern can be problematic. In-memory architectures are more constrained for storage than their disk-based brethren and replication is not a scalable storage pattern. This problem is addressed through the application of the Connected-Replication Pattern.

Making the Replication of Dimensions Practical in a Distributed In-Memory Architecture using the Connected-Replication Pattern

A reality of most commercial databases is that a large proportion of the data remains unused. This problem is highlighted by the work done around archiving in the database community. One recent study shows up to 80% of data in enterprise databases is no longer in use. The Connected Replication Pattern leverages this fact to reduce the amount of data that must be replicated by only replicating objects that are actively connected to Facts at any point in time.

The growth of Dimension data is a problem when applying a Snowflake-Schema to achieve fast joins through the replication of Dimensions. The reality is that some Dimensions will inevitably be large. In fact the data set used above actually has includes some dimensions that are too large for replication. Fig. 5 includes that the Dimension “Party Alias” which is both very large and does not share the same key as the other Facts, so cannot be partitioned with them.

Connected Replication tracks the links between Dimensions and Facts as data is written to the store. This acts like a real time archiving process ensuring only the absolute minimum number of dimensions are replicated i.e. only those currently connected to Facts. Fig. 6 shows the size of dimensions after applying the Connected-Replication pattern using the same scale as Fig. 5. You can see there is more than an order of magnitude less data to replicate after Connected-Replication has been applied.

Under Connected-Replication, as data is written, a recursive process examines the relations between Dimensions and ensures that they are replicated. This is shown in the Fig. 7: A trade is written. It has three relations to PartyAlias, SourceBook and Ccy. A message is passed to the storage layer for each of these entities (the white lines) and if the Dimensions are not already replicated they are pushed into replicated storage in the Query Layer (the blue/yellow lines). This process recurses through all the arcs in the domain model until the Query Layer contains all “Connected-Dimensions”.

The mechanism can be either immediately or eventually consistent. The former not surprisingly decreases write performance. An offline process prunes the connected caches of unused dimensions corresponding to data that has been removed.

Hopefully you have seen that Connected-Replication provides a novel approach to balancing Replication and Partitioning so that joins can be done in-process whilst minimising the memory footprint. It is not necessary to have two layers. Replication could be added to all nodes in the grid (i.e. the two layers are folded together) but we find it preferable to hold them separately as the ODC has far more data nodes than it does query nodes (400 storage nodes are serviced by 40 query nodes). For our use case this provides a better use of memory.

There is more info on this in the slides from QCon and JavaOne

See Also:

Beyond the Data Grid: Coherence, Normalisation, Joins and Linear Scalability (QCon)

Performing Efficient Cross-Cache Joins in Coherence


Joins: Simple joins using CQC or Key-Association

Friday, November 20th, 2009

You need to return a data set made from related items in different distributed caches. You need to do a join. So how do you do it efficiently in Coherence?

As a general rule the aggregate pattern is the best approach for Coherence or other NoSQLs which implement cache aside. That is to say you are simply using the cache to scale data access, it is authored elsewhere – often a relational database. However sometimes it’s useful to use server side joins if you write data directly to the cache rather than using cache-aside.  Use cases for this include datasets that vary independently (in our world trades and risk results are good examples – you want to version them independently but you often want the results combined without two separate calls from the client).

The Two Options

In practice there are two types of join that are worth considering. The first is the trivial case is where you join on the client or extend proxy and use near caching or CQCs to optimise this. The more complex option joins on the server using key affinity.

Simple joining in the client/extend (two-stage query) with NC/CQC optimisation

Lets look at the query ‘Get me orders for customers in Belgium’ with reference to the Northwind Database Schema:

Select o.* from Orders o, Customer c
where o.customerId = c.customerId
and c.country = 'Bulgium'

The normal query plan for such a query would involve separating the query into two sections for the two where clauses.  This join is fairly easy to execute in the distributed world because the Customer table query is clearly smaller and hence should be evaluated first. That is to say that we know intuitively that “#Orders” >>> “#Customers in Belgium”.

//Stage 1
Set customersInBelgium = customersCache.keySet(
   new EqualsFilter("country", "Belgium")
);
//Stage 2
ordersCache.entrySet(
   new InFilter("customerId", customersInBelgium)
);

The best way to tackle a request like this is via a two-stage query hitting the Customer cache first and then the Orders cache. This assumes that the order can be efficiently predetermined because the proportional data populations are well known. If your Customer’s cache is relatively small you can make this 2-stage query have only one wire call by either adding Near Caching to the Customers cache so the join is local or wrap it in an invocable, run it on the server and put the Customers in a Replicated Cache.

Tip: Use Continuous Query Caches or Replicated Caches to make 2-Stage joins a single step

The Single-Step Case: Doing Joins With Affinity Across Multiple Distributed Caches

The more complex case arises when the two sub sections of the query still return very large result sets. Using the two-staged query method for this type of join would result in very large data sets being returned back to the client during the intermediary phase.

Implementing such a join efficiently involves using affinity to bind together related data from the two caches.

I discussed this in some detail, along with the problems it brings up, in an associated article (here), in particular the problems that arise from the Coherence threading model. However if you read this post you’ll probably guess how performing a join naturally follows on from the idea of collocated processing.

So lets look at how we do it using the Northwind Database Schema we used above. You wish to perform a query which, in SQL would be represented as such:

Select Orders.*
from Orders o, OrderDetails od
where o.orderId = od.orderId
and o.orderDate = today()
and od.unitPriceQuantityDiscount = 0.05

In this case both the result sets from the Orders part of the query and from the OrderDetails part will be large even though the end product might be quite small. If we were to do this as a two stage query in Coherence’s it would look like this:

Set orderIds = orders.keySet(new EqualsFilter("orderDate", new Date()));
orderDetails.entrySet(
   new AndFilter(
      new EqualsFilter("unitProceQuantityDiscount", 0.05),
      new InFilter("marketId", orderIds)
   )
);

With the inefficiency that all the orderIds for today will be returned to the client before the order details are queried. Fortunately we can do this all in one go on the server. To do this we need the following tools:

  • An Aggregator – this is the best way to run some custom code on the server that is based off a query.
  • Affinity to bind the market and trades caches together so that corresponding entries are collocated (so when we find an order there is no network hop to get the corresponding OrderDetails record).
  • Some funky backing map magic to efficiently get at the entries we need.

The code ends up looking as below where this is the aggregate method of an Aggregator operating on the Orders cache.

public Object aggregate(Set setEntries) {
   Set; results = new HashSet();
   for (Object e: setEntries){
      Binary binaryOrder = ( (BinaryEntry)e).getBinaryValue();
      PofValue pofOrder = PofValueParser.parse(binarOrder, (PofContext) entry.getSerializer());
      Object orderId = pofOrder.getChild(Order.ORDER_ID_POF_VAL).getValue();
      Object orderIdInternal = entry.getContext().getKeyToInternalConverter().convert(orderId);
      Map detailsCache = entry.getContext().getBackingMap("OrderDetails");
      Binary binaryOrderDetails = (Binary) detailsCache.get(orderIdInternal);
      results.add(new Object[]{binaryOrder, binaryOrderDetails});
   }
   return results;
}

This method allows efficient joining of data across the cluster without shipping any data around. It works because we force Coherence to collocate Orders and OrderDetails with the same OrderId using Affinity. We then subvert the problems with the threading model (see here) by hitting the backing map directly.

There is one last trick that you may need to be aware of. The use case here was simplified because both tables have the same primary key. This is not always the case. If OrderDetails had a different PK, say OrderDetailsId, then we would not be able to access the OrderDetails backing map directly via the OrderId, instead we’d have to scan all objects in the backing map to look for it. The trick in this case is simply to set up your data model so that your OrderDetailsId is always derivable from the OrderId and other parameters that are mandatory in the query.

Using these two methods you can implement any type of join efficiently in Coherence. The only problem is that to reap these performance gains you need to know the join criteria, and something about your cache statistics, in advance.

Related Posts:

Find out how to do any join efficiently regardless of the key by applying snowflake schemas to manage replication and partitioning (HERE and a bit more here and here)

See JK’s post on Join filters here (much better than mine ;-) )

unitPriceQuantityDiscount


Historic posts on www.BenStopford.com