Monday, September 9, 2013

Coherence Data Affinity Part 2 using KeyAssociator


In the earlier section we defined the association in the key object by implementing KeyAssociation.
The other way to do it is by writing a class that implements KeyAssociator and configuring that in cache-config.xml.


1. In this technique we will first create a key class. The difference between the earlier key and the current key is. The earlier one implements KeyAssociation but the current one doesn't.

package com.coher.dto;
import com.tangosol.io.pof.annotation.Portable;
import com.tangosol.io.pof.annotation.PortableProperty;

import java.io.Serializable;
@Portable
public class OrderLineItemKeyBean implements Serializable
{
  @PortableProperty
  private String mOrderNumber;
  @PortableProperty
  private String mLineItemNumber;
  public OrderLineItemKeyBean()
  {
    super();
  }

  public OrderLineItemKeyBean(String pOrderNumber, String pLineItemNumber)
  {
    super();
    this.mOrderNumber = pOrderNumber;
    this.mLineItemNumber = pLineItemNumber;
  }

  public void setOrderNumber(String pOrderNumber)
  {
    this.mOrderNumber = pOrderNumber;
  }

  public String getOrderNumber()
  {
    return mOrderNumber;
  }

  public void setLineItemNumber(String pLineItemNumber)
  {
    this.mLineItemNumber = pLineItemNumber;
  }

  public String getLineItemNumber()
  {
    return mLineItemNumber;
  }
  public String toString()
  {
    return mOrderNumber +" "+ mLineItemNumber;
  }
}

 
2. Lets now create a KeyAssociator class. This class actually implements KeyAssociator and is configured in the cache-config.xml. The getAssociatedKey method actually returns the associated key.


package com.coher.dataaffinity;
import com.coher.dto.OrderLineItemKeyBean;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.partition.KeyAssociator;

public class MyKeyAssociator
  implements KeyAssociator
{
  public MyKeyAssociator()
  {
    super();
  }

  public void init(PartitionedService partitionedService)
  {

  }
  public Object getAssociatedKey(Object object)
  {
    if (object instanceof OrderLineItemKeyBean)
    {
      return ((OrderLineItemKeyBean) object).getOrderNumber();
    }
    else
    {
      return object;
    }
  }
}



3. Configure the KeyAssociator class in cache-config.xml

<?xml version="1.0" ?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
<scope-name>OneSystemScope</scope-name>
<defaults>
  <serializer>pof</serializer>
</defaults>
<caching-scheme-mapping>
  <cache-mapping>
   <cache-name>mycache</cache-name>
   <scheme-name>dist-default</scheme-name>
   <init-params>
    <init-param>
     <param-name>size-limit</param-name>
     <param-value>5000</param-value>
    </init-param>
   </init-params>
  </cache-mapping>
  <cache-mapping>
   <cache-name>DistributedOrders</cache-name>
   <scheme-name>dist-default</scheme-name>
   <init-params>
    <init-param>
     <param-name>size-limit</param-name>
     <param-value>5000</param-value>
    </init-param>
   </init-params>
  </cache-mapping>
  <cache-mapping>
   <cache-name>DistributedOrderLines</cache-name>
   <scheme-name>dist-default</scheme-name>
   <init-params>
    <init-param>
     <param-name>size-limit</param-name>
     <param-value>5000</param-value>
    </init-param>
   </init-params>
  </cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
  <distributed-scheme>
   <scheme-name>dist-default</scheme-name>
   <service-name>DistributedCache</service-name>
   <thread-count>5</thread-count>
   <backup-count>1</backup-count>
   <backup-storage>
    <type>file-mapped</type>
    <initial-size>1M</initial-size>
    <maximum-size>5G</maximum-size>
    <directory>/software/CoherenceInsall/coherence/backup</directory>
   </backup-storage>
   <key-associator>
    <class-name>com.coher.dataaffinity.MyKeyAssociator</class-name>
   </key-associator>
   <backing-map-scheme>
    <overflow-scheme>
     <scheme-name>LocalMemoryWithDiskOverflow</scheme-name>
     <front-scheme>
      <local-scheme>
       <high-units>300000</high-units>
      </local-scheme>
     </front-scheme>
     <back-scheme>
      <external-scheme>
       <scheme-name>DiskScheme</scheme-name>
       <nio-file-manager>
        <initial-size>1MB</initial-size>
        <maximum-size>1024MB</maximum-size>
        <directory>/software/CoherenceInstall/coherence/backup</directory>
       </nio-file-manager>
      </external-scheme>
     </back-scheme>
    </overflow-scheme>
   </backing-map-scheme>
   <autostart>true</autostart>
  </distributed-scheme>
  <invocation-scheme>
   <scheme-name>InvocationService</scheme-name>
   <service-name>InvocationService</service-name>
   <thread-count>5</thread-count>
   <autostart>true</autostart>
  </invocation-scheme>
</caching-schemes>
</cache-config>

 
4. Add the key class to pof-config.xml. If you are using POF.
 
5. Create a cache loader class to test
 
package com.coher.dataaffinity;
import com.coher.dto.OrderBean;
import com.coher.dto.OrderLineItemBean;
import com.coher.dto.OrderLineItemKeyBean;

import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;

import java.util.Random;
public class DataAffinityLoader
{
  public DataAffinityLoader()
  {
    super();
  }

  public static void main(String[] args)
  {
    NamedCache orderCache = CacheFactory.getCache("DistributedOrders");
    NamedCache orderLineCache =
      CacheFactory.getCache("DistributedOrderLines");
    orderCache.clear();
    orderLineCache.clear();

    int k = 0;
    for (int i = 0; i < 4; i++)
    {

      OrderBean ob = new OrderBean("" + i, "Customer" + i);
      orderCache.put(ob.getOrderNumber(), ob);
      Random r = new Random();
      int l = r.nextInt(20);
      for (int j = 0; j <= 3; j++)
      {
        k++;
        OrderLineItemBean olib =
          new OrderLineItemBean(ob.getOrderNumber(), "lineitem" + k,
                                "" + k, k);
        orderLineCache.put(new OrderLineItemKeyBean(ob.getOrderNumber(),
                                              olib.getItemNumber()), olib);
      }
    }
  }

}


6. Create a class to test the affinity

package com.coher.dataaffinity;
import com.coher.dto.OrderLineItemKeyBean;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.util.Filter;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.KeyAssociatedFilter;

import java.util.Set;
public class DataAffinityCacheViewer
{
  public DataAffinityCacheViewer()
  {
    super();
  }

  public static void main(String[] args)
  {
    NamedCache orderCache = CacheFactory.getCache("DistributedOrders");
    NamedCache orderLineCache = CacheFactory.getCache("DistributedOrderLines");
    System.out.println("orders Cache.size() = " + orderCache.size());
    PartitionedService ps1 = (PartitionedService) orderCache.getCacheService();
    Set<String> odrKeySet = (Set<String>) orderCache.keySet();
    for (String key: odrKeySet)
    {
      Member member = ps1.getKeyOwner(key);
      System.out.println("Coherence member:" + member.getId() + "; order key:" + key );
      EqualsFilter filterEq = new EqualsFilter("getOrderNumber", key);
      Filter filterAsc = new KeyAssociatedFilter(filterEq, key);
      Set<OrderLineItemKeyBean> ONKeySet = (Set<OrderLineItemKeyBean>) orderLineCache.keySet(filterAsc);
      PartitionedService ps2 = (PartitionedService) orderLineCache.getCacheService();
      for (OrderLineItemKeyBean key1: ONKeySet)
      {
        Member member1 = ps2.getKeyOwner(key1);
        System.out.println("              Coherence member:" + member1.getId() + "; Order Line Key:" + key1 );
      }
    } 

  }
}

 
Running the test.

a. Start two instances of coherence
b. first run DataAffinityLoader to load data
c. Now run DataAffinityCacheViewer to test affinity
orders Cache.size() = 4
Coherence member:3; order key:0
              Coherence member:3; Order Line Key:0 lineitem4
              Coherence member:3; Order Line Key:0 lineitem1
              Coherence member:3; Order Line Key:0 lineitem3
              Coherence member:3; Order Line Key:0 lineitem2
Coherence member:3; order key:1
              Coherence member:3; Order Line Key:1 lineitem7
              Coherence member:3; Order Line Key:1 lineitem8
              Coherence member:3; Order Line Key:1 lineitem6
              Coherence member:3; Order Line Key:1 lineitem5
Coherence member:1; order key:2
              Coherence member:1; Order Line Key:2 lineitem12
              Coherence member:1; Order Line Key:2 lineitem11
              Coherence member:1; Order Line Key:2 lineitem9
              Coherence member:1; Order Line Key:2 lineitem10
Coherence member:1; order key:3
              Coherence member:1; Order Line Key:3 lineitem14
              Coherence member:1; Order Line Key:3 lineitem15
              Coherence member:1; Order Line Key:3 lineitem16
              Coherence member:1; Order Line Key:3 lineitem13