Joins: Simple joins using CQC or Key-Association

You need to return a data set made from related items in different distributed caches. You need to do a join. So how do you do it efficiently in Coherence?

As a general rule the aggregate pattern is the best approach for Coherence or other NoSQLs which implement cache aside. That is to say you are simply using the cache to scale data access, it is authored elsewhere – often a relational database. However sometimes it’s useful to use server side joins if you write data directly to the cache rather than using cache-aside.  Use cases for this include datasets that vary independently (in our world trades and risk results are good examples – you want to version them independently but you often want the results combined without two separate calls from the client).

The Two Options

In practice there are two types of join that are worth considering. The first is the trivial case is where you join on the client or extend proxy and use near caching or CQCs to optimise this. The more complex option joins on the server using key affinity.

Simple joining in the client/extend (two-stage query) with NC/CQC optimisation

Lets look at the query ‘Get me orders for customers in Belgium’ with reference to the Northwind Database Schema:

Select o.* from Orders o, Customer c
where o.customerId = c.customerId
and c.country = 'Bulgium'

The normal query plan for such a query would involve separating the query into two sections for the two where clauses.  This join is fairly easy to execute in the distributed world because the Customer table query is clearly smaller and hence should be evaluated first. That is to say that we know intuitively that “#Orders” >>> “#Customers in Belgium”.

//Stage 1
Set customersInBelgium = customersCache.keySet(
   new EqualsFilter("country", "Belgium")
);
//Stage 2
ordersCache.entrySet(
   new InFilter("customerId", customersInBelgium)
);

The best way to tackle a request like this is via a two-stage query hitting the Customer cache first and then the Orders cache. This assumes that the order can be efficiently predetermined because the proportional data populations are well known. If your Customer’s cache is relatively small you can make this 2-stage query have only one wire call by either adding Near Caching to the Customers cache so the join is local or wrap it in an invocable, run it on the server and put the Customers in a Replicated Cache.

Tip: Use Continuous Query Caches or Replicated Caches to make 2-Stage joins a single step

The Single-Step Case: Doing Joins With Affinity Across Multiple Distributed Caches

The more complex case arises when the two sub sections of the query still return very large result sets. Using the two-staged query method for this type of join would result in very large data sets being returned back to the client during the intermediary phase.

Implementing such a join efficiently involves using affinity to bind together related data from the two caches.

I discussed this in some detail, along with the problems it brings up, in an associated article (here), in particular the problems that arise from the Coherence threading model. However if you read this post you’ll probably guess how performing a join naturally follows on from the idea of collocated processing.

So lets look at how we do it using the Northwind Database Schema we used above. You wish to perform a query which, in SQL would be represented as such:

Select Orders.*
from Orders o, OrderDetails od
where o.orderId = od.orderId
and o.orderDate = today()
and od.unitPriceQuantityDiscount = 0.05

In this case both the result sets from the Orders part of the query and from the OrderDetails part will be large even though the end product might be quite small. If we were to do this as a two stage query in Coherence’s it would look like this:

Set orderIds = orders.keySet(new EqualsFilter("orderDate", new Date()));
orderDetails.entrySet(
   new AndFilter(
      new EqualsFilter("unitProceQuantityDiscount", 0.05),
      new InFilter("marketId", orderIds)
   )
);

With the inefficiency that all the orderIds for today will be returned to the client before the order details are queried. Fortunately we can do this all in one go on the server. To do this we need the following tools:

The code ends up looking as below where this is the aggregate method of an Aggregator operating on the Orders cache.

public Object aggregate(Set orders) {
    Map<Order, Details[]> all = new HashMap<Order, Details[]>();
    List<Details> buffer = new ArrayList<Details>();;

    for (BinaryEntry entry : (Set<BinaryEntry>) orders) {
        long orderId = (Long) entry.getKey();
        BackingMapContext context = entry.getContext().getBackingMapContext("order-details");
        Collection<Binary> valBackMap = context.getBackingMap().values();
        for (Binary val : valBackMap) {
            Details details = (Details) ExternalizableHelper.fromBinary(val, entry.getSerializer());
            if (details.getTradeId() == orderId) {
                buffer.add(details);
            }
        }
        all.put((Order) entry.getValue(), buffer.toArray(new Details[]{}));
        buffer.clear();
    }
    return all;
}

There is a better example of this (which you can run) here.

This method allows efficient joining of data across the cluster without shipping any data around. It works because we force Coherence to collocate Orders and OrderDetails with the same OrderId using Affinity. We then subvert the problems with the threading model (see here) by hitting the backing map directly.

There is one last trick that you may need to be aware of. The use case here was simplified because both tables have the same primary key. This is not always the case. If OrderDetails had a different PK, say OrderDetailsId, then we would not be able to access the OrderDetails backing map directly via the OrderId, instead we’d have to scan all objects in the backing map to look for it. The trick in this case is simply to set up your data model so that your OrderDetailsId is always derivable from the OrderId and other parameters that are mandatory in the query.

Using these two methods you can implement any type of join efficiently in Coherence. The only problem is that to reap these performance gains you need to know the join criteria, and something about your cache statistics, in advance.

Related Posts:

Find out how to do any join efficiently regardless of the key by applying snowflake schemas to manage replication and partitioning (HERE and a bit more here and here)

See JK’s post on Join filters here (much better than mine ;-))

unitPriceQuantityDiscount

Posted on November 20th, 2009 in Coherence


  1. Dave Felcey October 9th, 2010
    15:03 GMT

    Hi Ben,

    I stumbled across your blog today by chance. This is a really good and useful post – as are the others.

  2. ben October 13th, 2010
    11:19 GMT

    Thanks David;-) There is just the other ones on my blog but there will be more coming.

  3. Nicolas December 24th, 2010
    12:45 GMT

    Hi Ben,
    Very interesting post!

    One question:
    The tables I’m joining don’t share the same PK.
    Could you please clarify the following?
    “… The trick in this case is simply to set up your data model so that your OrderDetailsId is always derivable from the OrderId and other parameters that are mandatory in the query …”

    Regards,
    Nico

  4. ben December 29th, 2010
    17:41 GMT

    Hi Nico

    Merry Christmas!

    A very good question.

    My example was very simple. It is more usual to join caches that don’t share the same primary key as you say. The most usual case would be to have OrderDetails object having a Foreign Key reference back to the Orders object via the OrdersId (many to one relation).

    OrderDetails.orderId => Order.orderId

    This presents a problem if we are querying the Orders cache as the join as an implicit direction OrderDetails => Orders dictated by the presence of the foreign key. However this direction doesn’t help us much as we’d like to query Orders and join in the opposite direction TO the relevant OrderDetails. We really need a Reverse Index that points in this direction, but we don’t have one.

    One solution I eluded to in the post is to just scan the keys in the cache. Not a scalable solution which is why I didn’t recommend it. The bit you refer to is suggesting an alternative approach in which a suitable heuristic is used to do the reverse lookup efficiently by making the OrderDetailsId derivable at runtime.

    For example you might define OrderDetailsId as the association of the OrderId and a monotonically incrementing integer. Then you can simply code something to join all OrderDetails onto each Order via a limited set of HashMap lookups. In pseudocode:

    public Object aggregate(Set setEntries) {
       Set<Binary> results = new HashSet<Binary>();
       for (Object e: setEntries){
          Binary binaryOrder = ( (BinaryEntry)e).getBinaryValue();
          PofValue pofOrder = PofValueParser.parse(binarOrder, (PofContext) entry.getSerializer());
          Object orderId = pofOrder.getChild(Order.ORDER_ID_POF_VAL).getValue();
          Object orderIdInternal = entry.getContext().getKeyToInternalConverter().convert(orderId);
          Map detailsCache = entry.getContext().getBackingMap("OrderDetails");
    	  
          //different from here
          addDetails(detailsCache, results, ordersId, binaryOrder);
       }
       return results;
    }
    
    private void addDetails(Map detailsCache, Set<Binary> results, Object ordersId, Binary binaryOrder){
       int monotonicId = 0;
       while(true){
          Object binaryOrderDetails = detailsCache.get(new OrderDetailsId(ordersId, monotomicId));
          if(ordersId ==null){
              break;
          }
          results.add(new Object[]{binaryOrder, binaryOrderDetails});
          monotonic++;
       } 
    }

    Does this answer your question?

  5. Nicolas February 16th, 2011
    9:51 GMT

    Hi Ben,
    Thx for answering the question and sorry for taking a month to get back to you 🙂

    Cheers,
    Nico

    P.S.: I think Dave recommended your blog actually so thx to Dave as well !!!

  6. Bret Calvey September 12th, 2011
    14:52 GMT

    Hi,

    I have read this article with interest and I am looking at doing something similar in our system to reduce the number of network hops. It would be nice to just make one call to get all of our data instead of several calls (i.e. get the parent, get the child type As, get the child type Bs etc).

    The use case I want to experiment with is to look up all child items of a parent item by accessing the backing map directly.

    Let’s say I have two classes, P and C (parent and child) in a one-to-many relationship.

    The key of the C class contains the key of the P class so we use key association to locate related items on the same storage node.

    In our domain, we cannot derive what the child keys will be given the parent keys and it would be difficult to change our legacy system so that it generates child keys in this way (we would also have to totally change our database schema – so not an option). But the child type contains the ID of the parent type. We have indexes set up so that this “parentId” on the child is indexed.

    From what I have read above, it would be simple to find the child items if there was only one of them and it had the same ID as the parent.

    In my case, it looks like I have to scan through all of the child entries looking for matching items (i.e. where the child’s parent ID = the ID of the parent object).

    I am not too comfortable with having to iterate over potentially millions of items in order to find 2 or 3 records, so I wondered if there was any way I could take advantage of the indexes?

    Is there a way within an EntryProcessor to get the index information and look up the child entries given the parent ID so we do not have to iterate over everything?

    We are currently using Coherence 3.5, but I am aware that support for accessing backing maps from EntryProcessors has been improved in 3.7.

    In the “BackingMapContext” API docs (3.7), I can see this method…

    —————————————————
    java.util.Map getIndexMap()

    Return a map of indexes defined for the cache that this BackingMapContext is associated with. The returned map must be treated in the read-only manner.

    http://download.oracle.com/docs/cd/E18686_01/coh.37/e18683/com/tangosol/net/BackingMapContext.html#getIndexMap__
    ——————————————–

    I’m not sure, but I think this may be able to help me…??

    Has anyone tried anything similar before?

    I will keep experimenting and post any findings here…

    Thanks in advance,

    -Bret

  7. ben September 12th, 2011
    16:34 GMT

    Hi Bret

    This is an interesting topic (for me anyway) and one close to my heart. Firstly though, are you sure you can collocate all your data based on key affinity?? It is of course possible but most domain models will not support it due to crosscutting keys (i.e. there is no single key that all objects share that can be used for partitioning)?

    I’ll answer your question twice based on the answer to this:
    (1) If the answer is yes – i.e. you can partition everything with the same key association – then there is no easy way to access the Coherence indexes from an entry processor. It is possible but it involves doing some reflection on some non-java classes that exist deep in the coherence core (the coherence guys have some crazy language that generates bytecode). If you are really keen i can dig out the code. However you should be able to use a PartitionAwareBackingMap to reduce the length of your traversals significantly by limiting it to a single partition without having to do anything too crazy.

    (2) However if you can’t ensure everything shares the same key – the more general use case (and the one we have on ODC) – you could try our solution. This is quite different. We split entities into Facts and Dimensions (like in a data warehouse snowflake schema) and then replicate the dimension data using CQCs. The result is “query nodes” that have all the Dimensions on (stuff with different keys). This is similar to the approach you refer to but is more efficient as we can apply indexes to the CQC’s and we have this funky Connected Replication Pattern that minimises our memory utilisation (important when you are replicating data)

    I described the whole approach in a presentation which you can view here if you are interested: http://www.benstopford.com/2011/01/27/beyond-the-data-grid-building-a-normalised-data-store-using-coherence

    I am also on the cusp of publishing a write up but it’s not quite ready yet. I’m happy to send you a draft copy if that is of use.

    B

  8. Bret Calvey September 13th, 2011
    13:11 GMT

    Hi Ben,

    Thanks for the quick reply.

    In our system, we have a domain object called “Event” that has several child type objects. Some of these child objects in turn may have child type objects. Basically, everything under an event uses Key Association so that all of the related data about an event is stored on the same partition as the event.

    Therefore, I think option 1) above may work for us.

    We’re still in the experimental stage with this at the moment – we are basically using just Coherence as a map and we want to start using some of these more advanced features.

    I’d be very interested in anything you can send me for either approach you suggest (I appreciate some of the information may be in the “draft” stage)

    Thank you very much for your help,

    -Bret

  9. ben September 13th, 2011
    16:50 GMT

    Hey Bret

    So you are all “facts” in our model. We do this too (joining facts) but we ensure that all downward references are key based. Sounds like your keys are the other way around. Our model is not necessary for your use case so would add unnecessary complexity.

    One quite simple option is to manage your own index by creating a cache that contains the ‘reverse index’ you need to join so you don’t have to do a scan. This is easy to maintain by simply configuring a trigger to keep these index caches in order when you add and remove from the caches.

    I’d suggest trying if first with a PartitionAwareBackingMap. You may find that the performance is actually ok if you have a high partition count. If it’s too slow implement the reverse index (or try using the coherence ones but as I said the code is a bit crazy).

    I’ll ask JK to comment too (a very knowledgeable colleague of mine)

  10. Jonathan Knight September 14th, 2011
    11:14 GMT

    Hi Bret,

    I work with Ben on ODC.

    If I understand you correctly then all your related facts are pinned to the same partition using key association so when you want to do a query you know all the data you want is on a single node.

    The obvious way to query the related facts is to get the relevant backing map and iterate over it looking for what you want. This is easy enough to code but could be a little slow for big backing maps. As Ben said using a PartitionAwareBackingMap would result in smaller backing maps to search through.

    Alternatively you could do Filter queries against the backing maps just like you do against a normal cache. Coherence has a utility class called InvocableMapHelper which can run Filter queries agains any Map. Note though that when running a query against a backing map you will get back a Set of Map.Entry instances that contain the Binary key and value, if you want then as proper objects you would need to convert them. This is not really going to be any different than iterating over the backing map yourself, it is just done in a single method call but…

    InvocableMapHelper has a query method that also allows you to provide a map of indexes so you can make the queries more efficient by using the indexes that are already on the specific cache – you just need to be able to get the index Map. As Ben has said the code for getting the indexes is bit awkward prior to 3.7 but it is possible.

    Maintaining your own indexes is also possible but there could be timing issues between your indexes mutating and caches being mutated as you would not have all the locking that is associated with the built in indexes so a query that runs at the same time as a mutation migh be inconsistent.

    Below are three different versions of a method that will allow you to query any backing map on the same service as the original BinaryEntry using a Filter and will also use any available relevant indexes.

    3.5

    @SuppressWarnings({"unchecked"})
    public Set&lt;Map.Entry&gt; queryBackingMap(String nameOfCacheToSearch, Filter filter, BinaryEntry entry) {
        Set&lt;Map.Entry&gt; results;
        BackingMapManagerContext context = ((BinaryEntry)entry).getContext();
        DistributedCache distributedCache = (DistributedCache) context.getCacheService();
        ValueExtractor storageExtractor = new ReflectionExtractor("getStorage");
        Object storage = storageExtractor.extract(distributedCache);
        if (storage != null) {
            ValueExtractor indexExtractor = new ReflectionExtractor("getIndexMap");
            Map indexMap = (Map) indexExtractor.extract(storage);
            Map backingMapToSearch = context.getBackingMap(nameOfCacheToSearch);
            results = InvocableMapHelper.query(backingMapToSearch, indexMap, filter, true, false, null);
        } else {
            results = Collections.emptySet();
        }
        return results;
    }
    

    3.6

    @SuppressWarnings({"unchecked"})
    public Set&lt;Map.Entry&gt; queryBackingMap(String nameOfCacheToSearch, Filter filter, BinaryEntry entry) {
        Set&lt;Map.Entry&gt; results;
        PartitionedCache partitionedCache = (PartitionedCache) entry.getContext().getCacheService();
        Object storage = partitionedCache.getStorage(nameOfCacheToSearch);
        if (storage != null) {
            ValueExtractor extractor = new ReflectionExtractor("getIndexMap");
            Map indexMap = (Map) extractor.extract(storage);
            Map backingMapToSearch = entry.getContext().getBackingMap(nameOfCacheToSearch);
            results = InvocableMapHelper.query(backingMapToSearch, indexMap, filter, true, false, null);
        } else {
            results = Collections.emptySet();
        }
        return results;
    }
    

    3.7

    @SuppressWarnings({"unchecked"})
    public Set&lt;Map.Entry&gt; queryBackingMap(String nameOfCacheToSearch, Filter filter, BinaryEntry entry) {
        Map backingMapToSearch = entry.getContext().getBackingMap(nameOfCacheToSearch);
        Map indexMap = entry.getBackingMapContext().getIndexMap();
        return InvocableMapHelper.query(backingMapToSearch, indexMap, filter, true, false, null);
    }
    

    You can see in the 3.5 and 3.6 versions we use ValueExtractors to call methods via reflection on various classes. This is because the code in the non-public parts of Coherence is not written in Java but something called TDE (Tangosol Development Environment) which compiles to byte code and the Java compiler has trouble with it so we have to use reflection. I use IntelliJ as an IDE and although IntelliJ complains about the 3.6 code and highlights it as errors it still compiles. Also as it is non-public you can see it can change between releases without any documentation or release notes. Version 3.7 is by far the easiest as Oracle seem to be opening up more of the internals and exposing them via the puplic API.

    If you want to have a Set of the real key and value rather than the Binary versions then you can use the Coherence converter classes like this:

    BackingMapManagerContext context = entry.getContext();
    Converter keyUpConverter = context.getKeyFromInternalConverter();
    Converter keyDownConverter = context.getKeyToInternalConverter();
    Converter valueUpConverter = context.getValueFromInternalConverter();
    Converter valueDownConverter = context.getValueToInternalConverter();
    Set converted = new ConverterCollections.ConverterEntrySet(results, keyUpConverter, keyDownConverter, valueUpConverter, valueDownConverter);
    

    The code above basically wraps your Set<Map.Entry> in a ConverterEntrySet using the converters from the cache service. When you access Map.Entry values from the set the key and value in these will be converted to the proper Object values.

    One other comment would be that using an EntryProcessor perform your queries would be quite slow as there is a lot of locking involved. It would be better to use a custom EntryAggregator as this does not involve so much locking as they are read-only and tests have shown aggregators run much quicker. The aggregate method of the aggregator is passed a Set of entries, which will be BinaryEntry instances so you can work with them the same way you would with an EntryProcessor, you just cannot update them.

    JK

  11. ben September 14th, 2011
    13:39 GMT

    Thanks JK 🙂

  12. Bret Calvey September 14th, 2011
    14:12 GMT

    Hi Ben,

    Again, thanks for the quick response.

    I did think about maintaining our own index, but didn’t think of using Triggers to maintain them – nice idea!

    I’ve got several things to try out now – thanks for your help + will keep you posted!

    Cheers,

    -Bret

  13. Bret Calvey September 14th, 2011
    14:15 GMT

    Have also just noticed Jonathan’s post after refreshing the page…

    All great info, thanks for this – got plenty to try out now!

    Ta,

    -Bret

Have your say

XHTML: You can use these tags: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>




Safari hates me
IMPORTANT! To be able to proceed, you need to solve the following simple problem (so we know that you are a human) :-)

Add the numbers ( 13 + 11 ) and SUBTRACT two ?
Please leave these two fields as-is:

Talks (View on YouTube)