Archive for the ‘Coherence Patterns’ Category

The Collections Cache

Monday, November 7th, 2011

This is a very simple pattern that can be used to solve a variety of problems. The structure uses a “Collections Cache”: a cache that appends values to a collection using a Trigger. You can then access the entire collection using a get() or alternatively use an EntryProcessor to extract a certain value from the collection.

The pattern is used on ODC to track aggregate views. Say you want a materialised view of trades grouped by book.  We keep a reverse index of trade references for each book. The view is updated asynchronously as data is added to the cache using an async CacheStore.  The pattern is applicable to a variety of other use cases, one being an approach to managing version history.

One downside of this pattern is that if the concept is not well known it can be confusing, after all the object you put() is not the same type as is returned from a get().  Simple naming as collections* can help avoid this confusion though.


A Singleton Service

Saturday, November 5th, 2011

Being a data grid, Coherence is very good at doing things in a distributed way across all nodes in the cluster. However it doesn’t offer any functionality (currently) for running a service just the once, in a reliable manner. Most applications solve this problem by simply running another process, for example you might start a second process that reads data off some queue and keeps your cluster up to date. It’d be nice however if you could leverage Coherence’s fault tolerance to ensure that, if the cluster was running, your QueueListener was always running too. In fact this is fairly simple to do and can be used for a host of common applications including loading data, keeping it up to date, adding indexes and regulating a cluster wide time stamp (article to follow).

What we want is a service that will always run on one of our Coherence nodes no matter what happens to the cluster.

This solution is conceptually simple. You have lots of processes in your cluster. When each node starts it simply checks whether the service has already been started elsewhere by attempting to lock a fictitious, well-known key:

lockCache.lock(“SingletonLockKey”);

Only one of the processes in the cluster will attain the lock. If it does attain it then it starts the Singleton Service, adds indexes, loads data or whatever. Simple. If the node running the service dies then the lock is released and another process will acquire it and start the singleton service there.

//Run in a new thread on a wrapped DefaultCacheServer i.e. should run on every node
int blockUntilLockAquired = -1;
lockCache.lock(“SingetonLockKey”);
while(true){
   boolean locked = lockCache.lock("singletonLockKey", blockUntilLockAquired);
   if(locked){
      //start singletons here
      wait();
   }
}

A Reliable version of putAll()

Friday, November 4th, 2011

I like triggers in Coherence. They allow us to do lots of cool stuff to our objects as we add them to the cache. Implement versioning, stamp them with cluster time, save them to a messaging system, check for duplicate writes, check for concurrent writes … the list goes on. But with all this processing comes the risk of failure and Coherence provides little in the way of exception reporting. In fact it provides no information on the individual failures, something that quickly becomes a problem as the level of trigger functionality increases. On ODC this caused us a real problem so we re-implemented putAll() so that it correctly reported those writes that failed. Credit goes to Jonathan Knight and Andrew Wilson for working this implementation through.

The pattern is pretty simple at a high level. It involves two Invocables. The first simply executes on the extend proxy, as we need to be inside the cluster to get access to the key assignment strategy. The next step is to split the data being written into the subsets applicable to each node using getKeyOwner(). These subsets are then sent, via a second Invocable, to the members that own them and EntryProcessors are used to do the write to the backing map directly (although this is no longer needed in 3.7). This is shown pictorially below.

[Edit Jan '12] My colleague Jon ‘The Gridman’ Knight has done a fantastically detailed post which drills into how to implement this pattern in Coherence]


Coherence Implementation Patterns – Slides from Coherence SIG

Friday, November 4th, 2011

You can view the PDF version here


Managing Versioning

Wednesday, October 19th, 2011

This is the first in a series of posts describing some useful patterns for implementing Coherence data grids.

Most non-trivial caches need to version their objects. There are number of reasons for wanting this:

  1. Versioning provides a historic record of changes.
  2. By linking versioning with the wall-clock / business times (i.e. bi-temporal) views of the system at previous points in time can be recomponsed. This is important for providing consistent views over your data.
  3. Versioning allows concurrency to be managed through Multi-Version Concurrency Control (MVCC)

However simply adding versions to your objects (more precisely your object key) has the downside that you can no longer look up the value via it’s business key: you must know the business key as well as the version of the object that you want.

Key = [Business Key][Version]

In Coherence accessing objects via their key directly is far more performant than doing a query (see The Fallacy of Linear Scalability) so it is preferable to keep the latest version of the object available via its business key alone. There are two common approaches to solving this problem: The Latest/Versioned pattern and the Latest Version Marker pattern.

Using Latest and Versioned Caches

The first approach is to define two caches for every object. The Latest… cache and the Versioned… cache. The key of the ‘latest’ cache is simply the business key:

Latest Cache Key = [Business Key]

This cache only ever contains the latest object. The ‘versioned’ cache contains all versions of the object with a, usually monotonically incrementing version embedded in the key:

Versioned Cache Key = [Business Key][Version]

Writes must be directed at the ‘Latest’ cache and a Coherence Trigger is used to copy the object reference to the ‘Versioned’ cache adding the version onto the key as it does so. This is demonstrated in the first figure opposite.

The disadvantage of this approach is a memory inefficiency arising because the  latest object exists in both Latest and Versioned caches. When the object is written the same reference can be used to save space, however the backup copies in each cache will be different instances and, should a node be lost, and  process of recreating the primary from the backup copy will create new instances by default further eating memory. It is therefore advisable to use the LatestMarker pattern below when memory is a concern. The advantage of this approach is that it reduces the number of records in the latest caches which makes filter operations faster when they operate only on ‘Latest’ data (a common use case in most applications).

Checklist:

  • Define two cache schemes based on the masks Latest* and Versioned* ensuring that they are in the same CacheService.
  • In the Latest* scheme specify a trigger to forward objects to the versioned cache, incrementing the version as it does so.
  • Specify KeyAssociation (Affinity) on the business key of the Latest* cache across both caches.
  • Write a trigger that adds a monotomically incrementing version to the business key as it copies the value’s reference to the Versioned cache. You’ll need to use direct backing map access to avoid reentrancy problems (I’ve discussed the issues of reentrancy in Coherence before. See Merging Data And Processing: Why it doesn’t “just work”). The code sample below is provided for reference.

Using Versioned Cache Only With a Latest Version Marker

A second approach to solving the same problem is to only use a single cache with the key format:

Key = [Business Key][Version]

but specifying that the latest version of an object has a special version marker:

KeyLatest = [Business Key][LatestVersionMarker]

As clients are aware of the LatestVersionMarker (for example -1 is common) they can always access the latest value directly by calling:

cache.get([businessKey][-1])

This approach does not suffer from the issues of duplication  associated with separate Latest and Versioned caches but has the disadvantage that versioned data is in the same cache as latest data, marginally slowing down filters. Just reiterating that again: in this pattern there is only one copy of the latest object. The one with the latest marker. This is different to the latest/versioned pattern where the latest object will exist in both caches (so twice) so that the versioned cache can contain all versions of that object.

Checklist:

  • Create a cache with a KeyAssociation on the business key (i.e. the key parts without the version number). Add a trigger that replaces the current value for the  “LatestMarker” with the new object whilst copying the old value to a key with the appropriate real version. You’ll need to use direct backing map access to avoid reentrancy problems (I’ve discussed the issues of reentrancy in Coherence before [link]). See code sample below.

Implementing the trigger to avoid reentrancy issues

The below code outlines one  mechanism for moving objects (in this case for the Latest/Versioned pattern) from one cache to the other using direct backing map access.

public void copyObjectToVersionedCacheAddingVersion(MapTrigger.Entry entry) {
   // I'm assuming that you are tracking the version, and incrementing it, in your object
   // Also note that it's more efficient to just take the version out rather than deserialise
   // the whole object but this way is more succinct
   MyValue value = (MyValue)entry.getValue();
   MyKey versionedKey = (MyKey)value.getKey();

   BinaryEntry binaryEntry = (BinaryEntry)entry;
   Binary binaryValue = binaryEntry.getBinaryValue();

   Map versionedCacheBackingMap = binaryEntry.getContext().getBackingMap("VersionedCacheName");
   versionedCacheBackingMap.put(toBinary(versionedKey), binaryValue);
}


Summary

Both approaches provide direct access to latest objects (most importantly key-based access to the most commonly used ‘latest’ version) without requiring knowledge of the version itself. Both retain a history of versions, something that is important for locking, MVCC and snapshotting (I’ll be writing more about these later). Affinity (Key Association) is used to ensure that  the versioning process is entirely local to the JVM doing the write.

Related Posts

  1. Use normalisation to reduce the versioning burden through the application of Star Schemas and Connected Replication [link]
  2. Performing cross cache joins in Coherence [link]
  3. Understanding problems of reentrancy in Coherence [link]

Joins: with Connected-Replication (Advanced)

Thursday, September 22nd, 2011

The article adds some detail to the ideas introduced in previous talks (particularly QCon and JavaOne). It describes a novel mechanism for storing data across a distributed architecture so that joins can be performed efficiently without key-shipping. A Snowflake-Schema is used to split data into Dimensions (which are then replicated) and Facts (which are partitioned). The Connected-Replication Pattern further optimises the replication of Dimensions by ensuring that only Dimensions that are actively referenced from Facts are replicated – Dimensions are only replicated if they are actually connected to other parts of the domain model. The approach has been implemented in the large in-memory data store, ODC, developed at The Royal Bank of Scotland. ODC applies the pattern to an in memory data grid built using Oracle Coherence but the pattern is applicable to any partitioned/sharded data store, be it in memory or disk resident.

The problem context: Executing complex joins in Shared-Nothing Architectures

The context of the problem is any partitioned data store: one where data is spread across a number of machines. This approach was first suggested by Dewitt et al in the Gamma Database and popularised in the database community with the term Sharding. The Sharding approach has been extended in more recent technologies by partitioning both data and the responsibility for processing it in what is termed a Shared Nothing Architecture. In Shared Nothing each node is self-sufficient, each having autonomy over the data it holds and processes. It is this autonomy that allows data-stores following this pattern to scale linearly for some common query loads.

Shared Nothing Architectures however come at a price. The partitioning model breaks down when queries require intermediary results to be shipped between machines, particularly where those intermediary results do not form part of the final result. Examples include joins between ‘Fact’ tables (where the join keys must be moved from one machine to another), multidimensional aggregations such as multi-dimensional risk calculations (i.e. the OLAP domain) or transactional writes that span the partitioning strategy.

Fortunately, many modern use-cases, particularly in the OLTP space, have little requirement for complex joins that span large data sets. For these simpler use-cases queries can be compartmentalised on a single node via some common attribute that they all share (known as a partitioning key or Data Affinity in Oracle Coherence). For example access to data in an online banking application might group data pertaining to a certain user. By choosing the UserId as a partitioning key user-centric joins can be executed entirely a single node and hence will scale.

The counter-example is queries that require lots of joins that cross the partitioning strategy. Extending our banking example, listing the details of accounts that a user can make payments into would mean accessing data associated with a different user. As the UserID is the partitioning key, account information for different users will be located in a different partition. This typically requires key shipping: A two-stage query which first returns the users details then scans the cluster for the various account details for other users: the payees. This example is trivial but it alludes to a much larger problem when queries must include many different data items that cross partitioning (and hence machine) boundaries. It is these complex queries, those that need to join across a variety of crosscutting keys. The connected replication pattern addresses this problem.

The Use of a Snowflake Schema

There are three fundamental concepts that are used to optimise distributed data storage: Replication, Partitioning and Indexing. Here we’ll focus on the first two: Replication and Partitioning – with the aim being to remove the need for cross-partition joins.

The first step is to represent the data model as a Snowflake-Schema. The concept of a Snowflake-Schema is well known in the data-warehousing space but its application here is quite different as we use it to define what we replicate and what we partition. Take an object model such as that shown in Fig. 1. The dotted line represents the division between Facts and Dimensions. The line can only exist in one place: that being the point in the object model where it converges on a single entity – the focal point of one-to-many relationships. This focal point also identifies the most precise, commonly shared key among facts. Like the regular data warehousing application this can be described intuitively as Facts being the recorded fact, whilst dimensions represent the context that give that fact meaning.

As an example let us consider a typical online shopping application, an Order would be a Fact whilst the Customer with which the Order is placed is a Dimension that provides the Order with ‘context’. This pattern is particularly applicable to distributed data storage as it provides a middle-ground between 3rd normal form, which presents too many joins for practical distributed applications and full denormalisation which presents a range of consistency issues when changes affect large numbers of denormalised entities (as well as proving problematic when objects need to be versioned).

Snowflaking is important for tempering a version explosion (versioning of objects is important for MVCC, necessary in most non-trivial data stores).  By holding sub entities separately they can be versioned independently meaning that a change in a sub entity, for example cashflows in the model in Fig. 3, does not necessitate a version increase on the other related objects: Transaction, MTM and Legs in this case. The alternative to this would be to hold all the data as a single Fact but any change would necessitate a new version of the whole group. This is version explosion is prohibitive when using in-memory architectures.

So we have divided our data model into a Snowflake-Schema. However there is little novel to that. The value from doing so becomes evident when we use the two classifications of entities, Facts and Dimensions, to drive whether data is to be partitioned or replicated. In this manner we are able to balance replication and partitioning so that no distributed, sequential join operations are required: joins where the key-set for one entity must be retrieved first from one set of machines followed by a second query to get related keys to complete the join from another entity, on another machine. Referring back to the at the object model in Figs. 1/3, to query transactions for the PartyAlias “City Group” where the Product is an “FX trade” we would first request the PartyAlias IDs for “City Group” (Stage 1 query), once that returned we would query for the product id for “FX trades” (Stage 2 query) and then finally we’d query for the Transactions that matched the keys for those PartyAlias’ and Products (Stage 3 query). It is this sequential set of distributed queries that affect performance.

This problem is solved through the application of a Snowflake Schema so that Dimensions are replicated to the Query Layer that sits in front of the grid, whilst keeping the Facts partitioned in the grid itself (See Fig. 4). Queries still need to be sequential as described above, but importantly all the Dimension queries remain in process as the dimension data is replicated (there is no network call required) and hence the cost is minimised.

As an example of this Fig. 3 shows a typical query in which the ‘where’ clause specifies a Cost Centre. Sequential queries must navigate their way down the object model until they reach the lowest dimension. In this case Source Book. Because these dimensions are replicated everywhere the calls are in-process and hence will be fast. The result is a set of IDs for this ‘lowest’ Dimension. These IDs are then used to query the Facts, which are held partitioned across the cluster. A distributed call is made to the grid to retrieve Facts. Fact and Sub-Fact joins are done, in-process, in the various partitions across the grid (as we know related Facts and Sub-Facts will be collocated in the same partition).

This concept is not novel, the commercial databases Vertica, Greenplumb and other data warehousing products all make use of replicated data. However applying this pattern can be problematic. In-memory architectures are more constrained for storage than their disk-based brethren and replication is not a scalable storage pattern. This problem is addressed through the application of the Connected-Replication Pattern.

Making the Replication of Dimensions Practical in a Distributed In-Memory Architecture using the Connected-Replication Pattern

A reality of most commercial databases is that a large proportion of the data remains unused. This problem is highlighted by the work done around archiving in the database community. One recent study shows up to 80% of data in enterprise databases is no longer in use. The Connected Replication Pattern leverages this fact to reduce the amount of data that must be replicated by only replicating objects that are actively connected to Facts at any point in time.

The growth of Dimension data is a problem when applying a Snowflake-Schema to achieve fast joins through the replication of Dimensions. The reality is that some Dimensions will inevitably be large. In fact the data set used above actually has includes some dimensions that are too large for replication. Fig. 5 includes that the Dimension “Party Alias” which is both very large and does not share the same key as the other Facts, so cannot be partitioned with them.

Connected Replication tracks the links between Dimensions and Facts as data is written to the store. This acts like a real time archiving process ensuring only the absolute minimum number of dimensions are replicated i.e. only those currently connected to Facts. Fig. 6 shows the size of dimensions after applying the Connected-Replication pattern using the same scale as Fig. 5. You can see there is more than an order of magnitude less data to replicate after Connected-Replication has been applied.

Under Connected-Replication, as data is written, a recursive process examines the relations between Dimensions and ensures that they are replicated. This is shown in the Fig. 7: A trade is written. It has three relations to PartyAlias, SourceBook and Ccy. A message is passed to the storage layer for each of these entities (the white lines) and if the Dimensions are not already replicated they are pushed into replicated storage in the Query Layer (the blue/yellow lines). This process recurses through all the arcs in the domain model until the Query Layer contains all “Connected-Dimensions”.

The mechanism can be either immediately or eventually consistent. The former not surprisingly decreases write performance. An offline process prunes the connected caches of unused dimensions corresponding to data that has been removed.

Hopefully you have seen that Connected-Replication provides a novel approach to balancing Replication and Partitioning so that joins can be done in-process whilst minimising the memory footprint. It is not necessary to have two layers. Replication could be added to all nodes in the grid (i.e. the two layers are folded together) but we find it preferable to hold them separately as the ODC has far more data nodes than it does query nodes (400 storage nodes are serviced by 40 query nodes). For our use case this provides a better use of memory.

See Also:

Beyond the Data Grid: Coherence, Normalisation, Joins and Linear Scalability (QCon)

Performing Efficient Cross-Cache Joins in Coherence


Joins: using Snowflake Schemas & CQCs (Intermediate)

Monday, May 9th, 2011

The application we recently built (ODC) departs slightly from the standard “dump the whole denormalised object into a cache” approach recommended for most caching implementations. The reason is that the project is not really a cache, it’s a data store, and as such we need to be able to manipulate different parts of the domain model independently. Holding objects in a denormalised form leads to the problem of how you keep all that denormalised data consistent. Data must be duplicated and this both eats memory (and much more so if you implement versioning) as well as making it very hard to implement any kind of consistency across all those copies.

To get around this we use a  Star-Schema  approach asa convenient mechansim for denoting whether an entity should be (a) a big thing that needs to partitioned accross the grid or (b) a smaller thing that we can afford to replicate in our query processing layer as there are simply not that many of them. In fact we take this model a step further my tracking the arcs on our domain model and only replicating those that are ‘connected’… but more on that later (see here).

In our case we split the application architecture into two layers: The Query Layer and the Data Layer. Dimensions are cached in Continuous Query Caches in the Query layer and the Facts are spread across the Coherence cluster. Related facts can be joined in-process as Key Affinity is used to ensure collocation (i.e. they are partitioned with the same key and Coherence uses the key to determine which partition they should go in).

The star-schema approach is appropriate as it allows dimensions to be changed in one place (atomically/isolated) in contrast to the denormalised approach, in which the dimension attributes would be held with the many facts that related to them leading to lots of duplication. Using this model you can store a complex relational model in Coherence and query it in an efficient way.

Efficiently Joining Facts and Dimensions in Coherence

The de facto standard for joining in Coherence is the 2 stage query (although it is really multi-stage). The first stage always hits the dimension tables for the query predicates. The second stage returns the facts based on the dimension sets. The third stage returns the dimensions required to present the users view.

If there are extra references internal to a dimension then these will elongate steps 1 and 3. The key point however is that we use Continuous Query Caches to ensure all joins to Dimensions are local.

Efficiently Joining Facts with other Facts in Coherence

If two fact entities must be joined then the join should be done object by object across the cluster. An Aggregator is used to run the computation with Affinity used to bind related records into the same JVM. The mechanism for doing this is documented fully in [i].

The Fact-Fact join approach is likely to be used in conjunction with the 2-stage query with the later composing in relevant dimension objects.

Managing Concurrency and Isolation in the Data Model

Coherence provides little support Atomic Transactions (beyond the granularity of a single object), Consistency nor Isolation. Thus we must simulate them as required (Recently Oracle has introduced a Transaction API but we still find it preferable to avoid the need for taking out distributed locks wherever possible).

The first tool for doing this is the implementation of MVCC[ii] within the data model. This is really just a fancy way of saying that we version objects and thus those objects are immutable. The benefit of such a model is that the view of the system can be defined, at any time, as the set of all objects whose version matches that point in time. The versioning also allows clients to determine consistency themselves (when writing a record you can ensure that there were no other concurrent updates from other users that might be overwritten).

As Fact objects enter the cache their content must be checked for referential integrity. This means that the user’s identification of every Dimension object must validated that:

  • The Dimension is identifiable (i.e. it exists)
  • It is unique (only one dimension is identified)

All dimensions are checked, in process, via the same CQCs used to speed up the 2-stage query. This has an unfortunate consequence: Changing a dimension will result in CQCs being updated across the cluster via one-phase commit. This presents a potential threat to atomaticity and isolation since the changing dimension will be incoherent across multiple JVMs during the one-phase commit. Fortunately query isolation in such a model is still ensured, from our perspective by making the simplifying assumption: consistency is only implemented within a single query processing node.

For more information see QCon presentation [iii].


[i] http://www.benstopford.com/2009/11/20/how-to-perform-efficient-cross-cache-joins-in-coherence/

[ii] http://en.wikipedia.org/wiki/Multiversion_concurrency_control

[iii] http://www.benstopford.com/2011/01/27/beyond-the-data-grid-building-a-normalised-data-store-using-coherence/


Joins: using Key-Association (Simplest)

Friday, November 20th, 2009

You need to return a data set made from related items in different distributed caches. You need to do a join. So how do you do it efficiently in Coherence?

In practice there are two types of join that are worth considering. The first is the simple case, where you have to separate entities that need to be joined together and one of the ‘branches’ of the query has a smaller result.

The Simple Case: The Two Staged Query

Lets look at the query ‘Get me orders for customers in Belgium’ with reference to the Northwind Database Schema:

Select o.* from Orders o, Customer c
where o.customerId = c.customerId
and c.country = 'Bulgium'

The normal query plan for such a query would involve separating the query into two sections for the two where clauses.  This join is fairly easy to execute in the distributed world because the Customer table query is clearly smaller and hence should be evaluated first. That is to say that we know intuitively that “#Orders” >>> “#Customers in Belgium”.

//Stage 1
Set customersInBelgium = customersCache.keySet(
   new EqualsFilter("country", 'Belgium') 
);
//Stage 2
ordersCache.entrySet(
   new InFilter("customerId", customersInBelgium)
);

The best way to tackle a request like this is via a two stage query hitting the Customer cache first and then the Orders cache. This assumes that the order can be efficiently predetermined because the proportional data populations are well known. If your Customer’s cache is relatively small you can make this 2-stage query have only one wire call by either adding Near Caching to the Customers cache so the join is local or wrap it in an invocable, run it on the server and put the Customers in a Replicated Cache.

Tip: Use Continuous Query Caches or Replicated Caches to make 2-Stage joins a single step

The Complex Case: Doing Joins With Affinity Across Multiple Distributed Caches

The more complex case arises when the two sub sections of the query still return very large result sets. Using the two-staged query method for this type of join would result in very large data sets being returned back to the client during the intermediary phase.

Implementing such a join efficiently involves using affinity to bind together related data from the two caches. I discussed this in some detail, along with the problems it brings up, in an associated article (here), in particular the problems that arise from the Coherence threading model. However if you read this post you’ll probably guess how performing a join naturally follows on from the idea of collocated processing.

So lets look at how we do it using the Northwind Database Schema we used above. You wish to perform a query which, in SQL would be represented as such:

 Select Orders.*
 from Orders o, OrderDetails od
 where o.orderId = od.orderId
 and o.orderDate = today()
 and od.unitPriceQuantityDiscount = 0.05

In this case both the result sets from the Orders part of the query and from the OrderDetails part will be large even though the end product might be quite small. If we were to do this as a two stage query in Coherence’s it would look like this:

Set orderIds = orders.keySet(
      new EqualsFilter("orderDate", new Date())
);
orderDetails.entrySet(
   new AndFilter(
      new EqualsFilter("unitProceQuantityDiscount", 0.05),
      new InFilter("marketId", orderIds)
   )
);

With the inefficiency that all the orderIds for today will be returned to the client before the order details are queried. Fortunately we can do this all in one go on the server. To do this we need the following tools:

  • An Aggregator – this is the best way to run some custom code on the server that is based off a query.
  • Affinity to bind the market and trades caches together so that corresponding entries are collocated (so when we find an order there is no network hop to get the corresponding OrderDetails record).
  • Some funky backing map magic to efficiently get at the entries we need.

The code ends up looking as below where this is the aggregate method of an Aggregator operating on the Orders cache.

public Object aggregate(Set setEntries) {
   Set<Binary> results = new HashSet<Binary>();
   for (Object e: setEntries){
      Binary binaryOrder = ( (BinaryEntry)e).getBinaryValue();
      PofValue pofOrder = PofValueParser.parse(binarOrder, (PofContext) entry.getSerializer());
      Object orderId = pofOrder.getChild(Order.ORDER_ID_POF_VAL).getValue();
      Object orderIdInternal = entry.getContext().getKeyToInternalConverter().convert(orderId);
      Map detailsCache = entry.getContext().getBackingMap("OrderDetails");
      Binary binaryOrderDetails = (Binary) detailsCache.get(orderIdInternal);
      results.add(new Object[]{binaryOrder, binaryOrderDetails});
   }
   return results;
}

This method allows efficient joining of data across the cluster without shipping any data around. It works because we force Coherence to collocate Orders and OrderDetails with the same OrderId using Affinity. We then subvert the problems with the threading model (see here) by hitting the backing map directly.

There is one last trick that you may need to be aware of. The use case here was simplified because both tables have the same primary key. This is not always the case. If OrderDetails had a different PK, say OrderDetailsId, then we would not be able to access the OrderDetails backing map directly via the OrderId, instead we’d have to scan all objects in the backing map to look for it. The trick in this case is simply to set up your data model so that your OrderDetailsId is always derivable from the OrderId and other parameters that are mandatory in the query.

Using these two methods you can implement any type of join efficiently in Coherence. The only problem is that to reap these performance gains you need to know the join criteria, and something about your cache statistics, in advance.

Related Posts:

Find out how to do any join efficiently regardless of the key by applying snowflake schemas to manage replication and partitioning (HERE and a bit more here and here)

See JK’s post on Join filters here (much better than mine ;-) )

unitPriceQuantityDiscount

Historic posts on www.BenStopford.com