Monday, September 9, 2013
Coherence Data Affinity Part 2 using KeyAssociator
In the earlier section we defined the association in the key object by implementing KeyAssociation.
The other way to do it is by writing a class that implements KeyAssociator and configuring that in cache-config.xml.
1. In this technique we will first create a key class. The difference between the earlier key and the current key is. The earlier one implements KeyAssociation but the current one doesn't.
package com.coher.dto;
import com.tangosol.io.pof.annotation.Portable;
import com.tangosol.io.pof.annotation.PortableProperty;
import java.io.Serializable;
@Portable
public class OrderLineItemKeyBean implements Serializable
{
@PortableProperty
private String mOrderNumber;
@PortableProperty
private String mLineItemNumber;
public OrderLineItemKeyBean()
{
super();
}
public OrderLineItemKeyBean(String pOrderNumber, String pLineItemNumber)
{
super();
this.mOrderNumber = pOrderNumber;
this.mLineItemNumber = pLineItemNumber;
}
public void setOrderNumber(String pOrderNumber)
{
this.mOrderNumber = pOrderNumber;
}
public String getOrderNumber()
{
return mOrderNumber;
}
public void setLineItemNumber(String pLineItemNumber)
{
this.mLineItemNumber = pLineItemNumber;
}
public String getLineItemNumber()
{
return mLineItemNumber;
}
public String toString()
{
return mOrderNumber +" "+ mLineItemNumber;
}
}
2. Lets now create a KeyAssociator class. This class actually implements KeyAssociator and is configured in the cache-config.xml. The getAssociatedKey method actually returns the associated key.
package com.coher.dataaffinity;
import com.coher.dto.OrderLineItemKeyBean;
import com.tangosol.net.PartitionedService;
import com.tangosol.net.partition.KeyAssociator;
public class MyKeyAssociator
implements KeyAssociator
{
public MyKeyAssociator()
{
super();
}
public void init(PartitionedService partitionedService)
{
}
public Object getAssociatedKey(Object object)
{
if (object instanceof OrderLineItemKeyBean)
{
return ((OrderLineItemKeyBean) object).getOrderNumber();
}
else
{
return object;
}
}
}
3. Configure the KeyAssociator class in cache-config.xml
<?xml version="1.0" ?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
<scope-name>OneSystemScope</scope-name>
<defaults>
<serializer>pof</serializer>
</defaults>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>mycache</cache-name>
<scheme-name>dist-default</scheme-name>
<init-params>
<init-param>
<param-name>size-limit</param-name>
<param-value>5000</param-value>
</init-param>
</init-params>
</cache-mapping>
<cache-mapping>
<cache-name>DistributedOrders</cache-name>
<scheme-name>dist-default</scheme-name>
<init-params>
<init-param>
<param-name>size-limit</param-name>
<param-value>5000</param-value>
</init-param>
</init-params>
</cache-mapping>
<cache-mapping>
<cache-name>DistributedOrderLines</cache-name>
<scheme-name>dist-default</scheme-name>
<init-params>
<init-param>
<param-name>size-limit</param-name>
<param-value>5000</param-value>
</init-param>
</init-params>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<distributed-scheme>
<scheme-name>dist-default</scheme-name>
<service-name>DistributedCache</service-name>
<thread-count>5</thread-count>
<backup-count>1</backup-count>
<backup-storage>
<type>file-mapped</type>
<initial-size>1M</initial-size>
<maximum-size>5G</maximum-size>
<directory>/software/CoherenceInsall/coherence/backup</directory>
</backup-storage>
<key-associator>
<class-name>com.coher.dataaffinity.MyKeyAssociator</class-name>
</key-associator> <backing-map-scheme>
<overflow-scheme>
<scheme-name>LocalMemoryWithDiskOverflow</scheme-name>
<front-scheme>
<local-scheme>
<high-units>300000</high-units>
</local-scheme>
</front-scheme>
<back-scheme>
<external-scheme>
<scheme-name>DiskScheme</scheme-name>
<nio-file-manager>
<initial-size>1MB</initial-size>
<maximum-size>1024MB</maximum-size>
<directory>/software/CoherenceInstall/coherence/backup</directory>
</nio-file-manager>
</external-scheme>
</back-scheme>
</overflow-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
<invocation-scheme>
<scheme-name>InvocationService</scheme-name>
<service-name>InvocationService</service-name>
<thread-count>5</thread-count>
<autostart>true</autostart>
</invocation-scheme>
</caching-schemes>
</cache-config>
4. Add the key class to pof-config.xml. If you are using POF.
5. Create a cache loader class to test
package com.coher.dataaffinity;
import com.coher.dto.OrderBean;
import com.coher.dto.OrderLineItemBean;
import com.coher.dto.OrderLineItemKeyBean;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import java.util.Random;
public class DataAffinityLoader
{
public DataAffinityLoader()
{
super();
}
public static void main(String[] args)
{
NamedCache orderCache = CacheFactory.getCache("DistributedOrders");
NamedCache orderLineCache =
CacheFactory.getCache("DistributedOrderLines");
orderCache.clear();
orderLineCache.clear();
int k = 0;
for (int i = 0; i < 4; i++)
{
OrderBean ob = new OrderBean("" + i, "Customer" + i);
orderCache.put(ob.getOrderNumber(), ob);
Random r = new Random();
int l = r.nextInt(20);
for (int j = 0; j <= 3; j++)
{
k++;
OrderLineItemBean olib =
new OrderLineItemBean(ob.getOrderNumber(), "lineitem" + k,
"" + k, k);
orderLineCache.put(new OrderLineItemKeyBean(ob.getOrderNumber(),
olib.getItemNumber()), olib);
}
}
}
}
6. Create a class to test the affinity
package com.coher.dataaffinity;
import com.coher.dto.OrderLineItemKeyBean;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.Member;
import com.tangosol.net.NamedCache;
import com.tangosol.net.PartitionedService;
import com.tangosol.util.Filter;
import com.tangosol.util.filter.EqualsFilter;
import com.tangosol.util.filter.KeyAssociatedFilter;
import java.util.Set;
public class DataAffinityCacheViewer
{
public DataAffinityCacheViewer()
{
super();
}
public static void main(String[] args)
{
NamedCache orderCache = CacheFactory.getCache("DistributedOrders");
NamedCache orderLineCache = CacheFactory.getCache("DistributedOrderLines");
System.out.println("orders Cache.size() = " + orderCache.size());
PartitionedService ps1 = (PartitionedService) orderCache.getCacheService();
Set<String> odrKeySet = (Set<String>) orderCache.keySet();
for (String key: odrKeySet)
{
Member member = ps1.getKeyOwner(key);
System.out.println("Coherence member:" + member.getId() + "; order key:" + key );
EqualsFilter filterEq = new EqualsFilter("getOrderNumber", key);
Filter filterAsc = new KeyAssociatedFilter(filterEq, key);
Set<OrderLineItemKeyBean> ONKeySet = (Set<OrderLineItemKeyBean>) orderLineCache.keySet(filterAsc);
PartitionedService ps2 = (PartitionedService) orderLineCache.getCacheService();
for (OrderLineItemKeyBean key1: ONKeySet)
{
Member member1 = ps2.getKeyOwner(key1);
System.out.println(" Coherence member:" + member1.getId() + "; Order Line Key:" + key1 );
}
}
}
}
Running the test.
a. Start two instances of coherence
b. first run DataAffinityLoader to load data
c. Now run DataAffinityCacheViewer to test affinity
orders Cache.size() = 4
Coherence member:3; order key:0
Coherence member:3; Order Line Key:0 lineitem4
Coherence member:3; Order Line Key:0 lineitem1
Coherence member:3; Order Line Key:0 lineitem3
Coherence member:3; Order Line Key:0 lineitem2
Coherence member:3; order key:1
Coherence member:3; Order Line Key:1 lineitem7
Coherence member:3; Order Line Key:1 lineitem8
Coherence member:3; Order Line Key:1 lineitem6
Coherence member:3; Order Line Key:1 lineitem5
Coherence member:1; order key:2
Coherence member:1; Order Line Key:2 lineitem12
Coherence member:1; Order Line Key:2 lineitem11
Coherence member:1; Order Line Key:2 lineitem9
Coherence member:1; Order Line Key:2 lineitem10
Coherence member:1; order key:3
Coherence member:1; Order Line Key:3 lineitem14
Coherence member:1; Order Line Key:3 lineitem15
Coherence member:1; Order Line Key:3 lineitem16
Coherence member:1; Order Line Key:3 lineitem13
Subscribe to:
Posts (Atom)