Thursday, June 20, 2013
Coherence Distributed Bulk Cache Loading using Invocation Service
When Loading a large amounts of cache, It is efficient to divide the load among the cache members. Below is a demonstration to do this.
My example assumes that you keep the files in the below directory Structure
Coherence Extract Directory
-----\Coherence Extract Directory
---\applib
---\CoherenceWork.jar
---\projectLoader.jar
---\bin
---\doc
---\lib
---\run
---\CacheConfig.xml
---\CacheOverride.xml
---\cacheproxy1.cmd
---\CacheProxyOverride.xml
---\cacheserver.cmd
---\pof-config.xml
---\ProxyConfig1.xml
1. Create cache operational config override file
CacheOverride.xml
<?xml version="1.0" ?>
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
<cluster-config>
<member-identity>
<cluster-name system-property="tangosol.coherence.cluster">DataCluster</cluster-name>
<process-name system-property="tangosol.coherence.process">CoherenceCacheServer</process-name>
<role-name system-property="tangosol.coherence.role">CacheServer</role-name>
</member-identity>
<unicast-listener>
<address system-property="tangosol.coherence.localhost">localhost</address>
<port system-property="tangosol.coherence.localport">8088</port>
<port-auto-adjust system-property="tangosol.coherence.localport.adjust">true</port-auto-adjust>
</unicast-listener>
</cluster-config>
<logging-config>
<destination system-property="tangosol.coherence.log">stdout</destination>
<severity-level system-property="tangosol.coherence.log.level">9</severity-level>
</logging-config>
</coherence>
2. Lets create a Cache Config file CacheConfig.xml
<?xml version="1.0" ?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
<scope-name>OneSystemScope</scope-name>
<defaults>
<serializer>pof</serializer>
</defaults>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>mycache</cache-name>
<scheme-name>dist-default</scheme-name>
<init-params>
<init-param>
<param-name>size-limit</param-name>
<param-value>5000</param-value>
</init-param>
</init-params>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<distributed-scheme>
<scheme-name>dist-default</scheme-name>
<service-name>DistributedCache</service-name>
<thread-count>5</thread-count>
<backup-count>1</backup-count>
<backup-storage>
<type>file-mapped</type>
<initial-size>1M</initial-size>
<maximum-size>5G</maximum-size>
<directory>/software/CoherenceInsall/coherence/backup</directory>
</backup-storage>
<backing-map-scheme>
<overflow-scheme>
<scheme-name>LocalMemoryWithDiskOverflow</scheme-name>
<front-scheme>
<local-scheme>
<high-units>300000</high-units>
</local-scheme>
</front-scheme>
<back-scheme>
<external-scheme>
<scheme-name>DiskScheme</scheme-name>
<nio-file-manager>
<initial-size>1MB</initial-size>
<maximum-size>1024MB</maximum-size>
<directory>/software/CoherenceInstall/coherence/backup</directory>
</nio-file-manager>
</external-scheme>
</back-scheme>
</overflow-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
<invocation-scheme>
<scheme-name>InvocationService</scheme-name>
<service-name>InvocationService</service-name>
<thread-count>5</thread-count>
<autostart>true</autostart>
</invocation-scheme>
</caching-schemes>
</cache-config>
3. Create a cmd to start cache instances cacheserver.cmd
@echo off
@
@rem This will start a cache server
@
setlocal
:config
@rem specify the Coherence installation directory
set coherence_home=%~dp0\..
@rem specify the JVM heap size
set memory=512m
:start
if not exist "%coherence_home%\lib\coherence.jar" goto instructions
if "%java_home%"=="" (set java_exec=java) else (set java_exec=%java_home%\bin\java)
:launch
if "%1"=="-jmx" (
set jmxproperties=-Dcom.sun.management.jmxremote -Dtangosol.coherence.management=all -Dcom.sun.management.jmxremote.port=7771 -Dtangosol.coherence.management.remote=true
shift
)
set java_opts=-Xms%memory% -Xmx%memory% %jmxproperties%
%java_exec% -server -showversion %java_opts% -Dtangosol.coherence.override=CacheOverride.xml -Dtangosol.coherence.distributed.localstorage=true -Dtangosol.coherence.cacheconfig=CacheConfig.xml -Dtangosol.coherence.edition=EE -cp "%coherence_home%\lib\coherence.jar";"%coherence_home%\applib\projectLoader.jar";"%coherence_home%\applib\CoherenceWork.jar" com.tangosol.net.DefaultCacheServer %1
goto exit
:instructions
echo Usage:
echo ^<coherence_home^>\bin\cache-server.cmd
goto exit
:exit
endlocal
@echo on
4 Create an Override file for the Proxy CacheProxyOverride.xml
<?xml version="1.0" ?>
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
<cluster-config>
<member-identity>
<cluster-name system-property="tangosol.coherence.cluster">DataCluster</cluster-name>
<process-name system-property="tangosol.coherence.process">CoherenceCacheServer</process-name>
<role-name system-property="tangosol.coherence.role">CacheProxy</role-name>
</member-identity>
<unicast-listener>
<address system-property="tangosol.coherence.localhost">localhost</address>
<port system-property="tangosol.coherence.localport">8088</port>
<port-auto-adjust system-property="tangosol.coherence.localport.adjust">true</port-auto-adjust>
</unicast-listener>
</cluster-config>
<logging-config>
<destination system-property="tangosol.coherence.log">stdout</destination>
<severity-level system-property="tangosol.coherence.log.level">9</severity-level>
</logging-config>
</coherence>
5. Create a cache config for the proxy. ProxyConfig1.xml
<?xml version="1.0" ?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
<scope-name>OneSystemScope</scope-name>
<defaults>
<serializer>pof</serializer>
</defaults>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>mycache</cache-name>
<scheme-name>dist-default</scheme-name>
<init-params>
<init-param>
<param-name>size-limit</param-name>
<param-value>5000</param-value>
</init-param>
</init-params>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<distributed-scheme>
<scheme-name>dist-default</scheme-name>
<service-name>DistributedCache</service-name>
<thread-count>5</thread-count>
<backup-count>1</backup-count>
<backup-storage>
<type>file-mapped</type>
<initial-size>1M</initial-size>
<maximum-size>5G</maximum-size>
<directory>/software/CoherenceInsall/coherence/backup</directory>
</backup-storage>
<backing-map-scheme>
<overflow-scheme>
<scheme-name>LocalMemoryWithDiskOverflow</scheme-name>
<front-scheme>
<local-scheme>
<high-units>300000</high-units>
</local-scheme>
</front-scheme>
<back-scheme>
<external-scheme>
<scheme-name>DiskScheme</scheme-name>
<nio-file-manager>
<initial-size>1MB</initial-size>
<maximum-size>1024MB</maximum-size>
<directory>/software/CoherenceInstall/coherence/backup</directory>
</nio-file-manager>
</external-scheme>
</back-scheme>
</overflow-scheme>
</backing-map-scheme>
<autostart>true</autostart>
</distributed-scheme>
<proxy-scheme>
<service-name>ExtendTcpProxyService</service-name>
<thread-count>5</thread-count>
<acceptor-config>
<tcp-acceptor>
<local-address>
<address>localhost</address>
<port>9098</port>
</local-address>
</tcp-acceptor>
<serializer>
<instance>
<class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
<init-params>
<init-param>
<param-type>String</param-type>
<param-value>pof-config.xml</param-value>
</init-param>
</init-params>
</instance>
</serializer>
</acceptor-config>
<proxy-config>
<cache-service-proxy>
<enabled>true</enabled>
</cache-service-proxy>
<invocation-service-proxy>
<enabled>true</enabled>
</invocation-service-proxy>
</proxy-config>
<autostart>true</autostart>
</proxy-scheme>
<invocation-scheme>
<scheme-name>InvocationService</scheme-name>
<service-name>InvocationService</service-name>
<thread-count>5</thread-count>
<autostart>true</autostart>
</invocation-scheme>
</caching-schemes>
</cache-config>
6. Create a command to start the proxy server cacheproxy1.cmd
@echo off
@
@rem This will start a cache server
@
setlocal
:config
@rem specify the Coherence installation directory
set coherence_home=%~dp0\..
@rem specify the JVM heap size
set memory=512m
:start
if not exist "%coherence_home%\lib\coherence.jar" goto instructions
if "%java_home%"=="" (set java_exec=java) else (set java_exec=%java_home%\bin\java)
:launch
if "%1"=="-jmx" (
set jmxproperties=-Dcom.sun.management.jmxremote -Dtangosol.coherence.management=all -Dcom.sun.management.jmxremote.port=7772 -Dtangosol.coherence.management.remote=true
shift
)
set java_opts=-Xms%memory% -Xmx%memory% %jmxproperties%
%java_exec% -server -showversion %java_opts% -Dtangosol.coherence.override=CacheProxyOverride.xml -Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.cacheconfig=ProxyConfig1.xml -Dtangosol.coherence.edition=EE -cp "%coherence_home%\lib\coherence.jar";"%coherence_home%\applib\CoherenceWork.jar";"%coherence_home%\applib\projectLoader.jar" com.tangosol.net.DefaultCacheServer %1
goto exit
:instructions
echo Usage:
echo ^<coherence_home^>\bin\cache-server.cmd
goto exit
:exit
endlocal
@echo on
Now we have all the cache cluster configurations ready. We now need to write the client programs and configurations
7. Create a pof-config.xml place a copy of this in the run directory
<?xml version="1.0" encoding="UTF-8" ?>
<pof-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config/1.1/coherence-pof-config.xsd">
<user-type-list>
<include>coherence-pof-config.xml</include>
<user-type>
<type-id>1001</type-id>
<class-name>com.coher.dto.OrderBean</class-name>
<serializer>
<class-name>com.tangosol.io.pof.PofAnnotationSerializer</class-name>
<init-params>
<init-param>
<param-type>int</param-type>
<param-value>{type-id}</param-value>
</init-param>
<init-param>
<param-type>java.lang.Class</param-type>
<param-value>{class}</param-value>
</init-param>
<init-param>
<param-type>boolean</param-type>
<param-value>true</param-value>
</init-param>
</init-params>
</serializer>
</user-type>
<user-type>
<type-id>1002</type-id>
<class-name>com.coher.loader.LoaderAgent</class-name>
<serializer>
<class-name>com.tangosol.io.pof.PofAnnotationSerializer</class-name>
<init-params>
<init-param>
<param-type>int</param-type>
<param-value>{type-id}</param-value>
</init-param>
<init-param>
<param-type>java.lang.Class</param-type>
<param-value>{class}</param-value>
</init-param>
<init-param>
<param-type>boolean</param-type>
<param-value>true</param-value>
</init-param>
</init-params>
</serializer>
</user-type>
</user-type-list>
<allow-interfaces>true</allow-interfaces>
<allow-subclasses>true</allow-subclasses>
</pof-config>
8. Create a local cache config
<?xml version="1.0" ?>
<cache-config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
xsi:schemaLocation="coherence-cache-config.xsd">
<scope-name>OneSystemScope</scope-name>
<defaults>
<serializer>pof</serializer>
</defaults>
<caching-scheme-mapping>
<cache-mapping>
<cache-name>mycache</cache-name>
<scheme-name>extend-dist</scheme-name>
<init-params>
<init-param>
<param-name>size-limit</param-name>
<param-value>5000</param-value>
</init-param>
</init-params>
</cache-mapping>
</caching-scheme-mapping>
<caching-schemes>
<remote-cache-scheme>
<scheme-name>extend-dist</scheme-name>
<service-name>ExtendTcpCacheService</service-name>
<initiator-config>
<tcp-initiator>
<remote-addresses>
<!-- include a socket-address for each distributed tier cluster node -->
<socket-address>
<address>10.10.2.106</address>
<port>9098</port>
</socket-address>
<socket-address>
<address>10.10.2.106</address>
<port>9099</port>
</socket-address>
</remote-addresses>
<connect-timeout>10s</connect-timeout>
</tcp-initiator>
<outgoing-message-handler>
<request-timeout>60s</request-timeout>
</outgoing-message-handler>
<serializer>
<instance>
<class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
<init-params>
<init-param>
<param-type>java.lang.String</param-type>
<param-value>pof-config.xml</param-value>
</init-param>
</init-params>
</instance>
</serializer>
</initiator-config>
</remote-cache-scheme>
<!-- the remote invocation scheme configuration defines the remote connection
interface between the front and back caches for the near cache. It enables
the near cache to perform operations as well as get update events. -->
<remote-invocation-scheme>
<scheme-name>extend-invocation</scheme-name>
<service-name>ExtendTcpInvocationService</service-name>
<initiator-config>
<tcp-initiator>
<remote-addresses>
<!-- include a socket-address for each distributed tier cluster node -->
<socket-address>
<address>10.10.2.106</address>
<port>9098</port>
</socket-address>
<socket-address>
<address>10.10.2.106</address>
<port>9099</port>
</socket-address>
</remote-addresses>
<connect-timeout>60s</connect-timeout>
</tcp-initiator>
<outgoing-message-handler>
<request-timeout>20s</request-timeout>
</outgoing-message-handler>
<serializer>
<instance>
<class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
<init-params>
<init-param>
<param-type>java.lang.String</param-type>
<param-value>pof-config.xml</param-value>
</init-param>
</init-params>
</instance>
</serializer>
</initiator-config>
</remote-invocation-scheme>
<invocation-scheme>
<scheme-name>InvocationService</scheme-name>
<service-name>InvocationService</service-name>
<thread-count>5</thread-count>
<autostart>true</autostart>
</invocation-scheme>
</caching-schemes>
</cache-config>
9. Create a local Cache Override file
<?xml version="1.0" ?>
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
<cluster-config>
<member-identity>
<cluster-name system-property="tangosol.coherence.cluster">DataCluster</cluster-name>
<process-name system-property="tangosol.coherence.process">CoherenceCacheServer</process-name>
<role-name system-property="tangosol.coherence.role">CacheClient</role-name>
</member-identity>
</cluster-config>
<logging-config>
<destination system-property="tangosol.coherence.log">stdout</destination>
<severity-level system-property="tangosol.coherence.log.level">9</severity-level>
</logging-config>
</coherence>
10 Creating Invokable class that extends AbstractInvocable
package com.coher.loader;
import com.tangosol.io.pof.annotation.Portable;
import com.tangosol.io.pof.annotation.PortableProperty;
import com.tangosol.net.AbstractInvocable;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import java.io.Serializable;
@Portable
public class LoaderAgent extends AbstractInvocable implements Serializable
{
@PortableProperty
private String mFromOrderNumber;
@PortableProperty private String mToOrderNumber;
public LoaderAgent()
{
super();
}
public LoaderAgent(String pFromOrderNumber, String pToOrderNumber)
{
super();
this.mFromOrderNumber = pFromOrderNumber;
this.mToOrderNumber = pToOrderNumber;
}
public void run()
{
NamedCache cache = CacheFactory.getCache("mycache");
// Write logic to query and load caches here
System.out.println(" ----------------------------------------------------------------------- ");
System.out.println("Loading Cache for Orders from " +mFromOrderNumber+ " to "+ mToOrderNumber);
System.out.println(" ----------------------------------------------------------------------- ");
}
public void setFromOrderNumber(String pFromOrderNumber)
{
this.mFromOrderNumber = pFromOrderNumber;
}
public String getFromOrderNumber()
{
return mFromOrderNumber;
}
public void setToOrderNumber(String pToOrderNumber)
{
this.mToOrderNumber = pToOrderNumber;
}
public String getToOrderNumber()
{
return mToOrderNumber;
}
}
11. Create a class that can observe Execution that implements InvocationObserver
package com.coher.loader;
import com.tangosol.net.InvocationObserver;
import com.tangosol.net.Member;
public class LoaderObserver
implements InvocationObserver
{
public LoaderObserver()
{
super();
}
public void memberCompleted(Member member, Object object)
{
System.out.println("Received membercompleted notification from Member: " + member + ", Result: " + object);
}
public void memberFailed(Member member, Throwable throwable)
{
System.out.println("Received member failed notification from Member: " + member.getId() + ", exception " + throwable);
}
public void memberLeft(Member member)
{
System.out.println("Received memberleft notification from Member: " + member.getId());
}
public void invocationCompleted()
{
System.out.println("Received invocationcompleted notification");
}
}
12. Create a class that can distribute load
package com.coher.loader;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.InvocationService;
import com.tangosol.net.Member;
import java.util.Collections;
import java.util.Iterator;
import java.util.Set;
public class LoaderAgentTest
{
public LoaderAgentTest()
{
super();
}
public static void main(String[] args)
{
InvocationService service = (InvocationService) CacheFactory.getService("InvocationService");
Set setMembers = service.getInfo().getServiceMembers();
int counter = 0;
for (Iterator it = setMembers.iterator() ; it.hasNext(); )
{
Member m = (Member)it.next();
System.out.println(m.getRoleName());
if (m.getRoleName().equals("CacheServer"))
{
LoaderAgent task = new LoaderAgent(counter + 1+"", counter+500+"");
service.execute(task, Collections.singleton(m),
new LoaderObserver());
}
counter = counter+500;
}
}
}
Start two instances of coherence cache servers(by double clicking on cacheserver.cmd twice) and one instance of the proxy (cacheproxy1.cmd)
Below are the links to project Source and configuration files
Project Source zip
Run folder zip
Subscribe to:
Posts (Atom)