Thursday, June 20, 2013

Coherence Distributed Bulk Cache Loading using Invocation Service



When Loading a large amounts of cache, It is efficient to divide the load among the cache members.  Below is a demonstration to do this.

My example assumes that you keep the files in the below directory Structure
Coherence Extract Directory
-----\Coherence Extract Directory
          ---\applib
                  ---\CoherenceWork.jar
                  ---\projectLoader.jar
          ---\bin
          ---\doc
          ---\lib
          ---\run
                  ---\CacheConfig.xml
                  ---\CacheOverride.xml
                  ---\cacheproxy1.cmd
                  ---\CacheProxyOverride.xml
                  ---\cacheserver.cmd
                  ---\pof-config.xml
                  ---\ProxyConfig1.xml

1. Create cache operational config override file

CacheOverride.xml
<?xml version="1.0" ?>
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
      xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
      <cluster-config>
          <member-identity>
            <cluster-name system-property="tangosol.coherence.cluster">DataCluster</cluster-name>
            <process-name system-property="tangosol.coherence.process">CoherenceCacheServer</process-name>
           <role-name system-property="tangosol.coherence.role">CacheServer</role-name>
          </member-identity>
         <unicast-listener>
             <address system-property="tangosol.coherence.localhost">localhost</address>
             <port system-property="tangosol.coherence.localport">8088</port>            
             <port-auto-adjust system-property="tangosol.coherence.localport.adjust">true</port-auto-adjust>        
         </unicast-listener>        
  </cluster-config>
      <logging-config>
        <destination system-property="tangosol.coherence.log">stdout</destination>       
        <severity-level system-property="tangosol.coherence.log.level">9</severity-level>     
      </logging-config>
</coherence>

 
2. Lets create a Cache Config file  CacheConfig.xml
 
<?xml version="1.0" ?>
<cache-config xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"
                      xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
                      xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
            <scope-name>OneSystemScope</scope-name>
            <defaults>
              <serializer>pof</serializer>
            </defaults>
            <caching-scheme-mapping>
                <cache-mapping>
                    <cache-name>mycache</cache-name>
                    <scheme-name>dist-default</scheme-name>
                    <init-params>
                      <init-param>
                        <param-name>size-limit</param-name>
                        <param-value>5000</param-value>
                      </init-param>
                    </init-params>
                </cache-mapping>           
                </caching-scheme-mapping>
            <caching-schemes>
                <distributed-scheme>
                  <scheme-name>dist-default</scheme-name>
                  <service-name>DistributedCache</service-name>
                  <thread-count>5</thread-count>
                  <backup-count>1</backup-count>            
                  <backup-storage>
                    <type>file-mapped</type>
                    <initial-size>1M</initial-size>
                    <maximum-size>5G</maximum-size>
                    <directory>/software/CoherenceInsall/coherence/backup</directory>
                  </backup-storage>
                  <backing-map-scheme>               
                                  <overflow-scheme>
                      <scheme-name>LocalMemoryWithDiskOverflow</scheme-name>
                      <front-scheme>                  
                        <local-scheme>                     
                            <high-units>300000</high-units>
                        </local-scheme>
                      </front-scheme>
                      <back-scheme>
                        <external-scheme>
                          <scheme-name>DiskScheme</scheme-name>
                          <nio-file-manager>
                            <initial-size>1MB</initial-size>
                            <maximum-size>1024MB</maximum-size>
                            <directory>/software/CoherenceInstall/coherence/backup</directory>
                          </nio-file-manager>
                        </external-scheme>
                      </back-scheme>
                    </overflow-scheme>
                  </backing-map-scheme>
                  <autostart>true</autostart>
                </distributed-scheme>
                <invocation-scheme>
                  <scheme-name>InvocationService</scheme-name>
                  <service-name>InvocationService</service-name>
                  <thread-count>5</thread-count>
                  <autostart>true</autostart>
                </invocation-scheme>
            </caching-schemes>      
</cache-config>

 
3. Create a cmd  to start cache instances  cacheserver.cmd
 
@echo off
@
@rem This will start a cache server
@
setlocal

:config
@rem specify the Coherence installation directory
set coherence_home=%~dp0\..

@rem specify the JVM heap size
set memory=512m

:start
if not exist "%coherence_home%\lib\coherence.jar" goto instructions

if "%java_home%"=="" (set java_exec=java) else (set java_exec=%java_home%\bin\java)
:launch
if "%1"=="-jmx" (
    set jmxproperties=-Dcom.sun.management.jmxremote -Dtangosol.coherence.management=all -Dcom.sun.management.jmxremote.port=7771 -Dtangosol.coherence.management.remote=true
    shift 
)   

set java_opts=-Xms%memory% -Xmx%memory% %jmxproperties%
%java_exec% -server -showversion %java_opts% -Dtangosol.coherence.override=CacheOverride.xml -Dtangosol.coherence.distributed.localstorage=true -Dtangosol.coherence.cacheconfig=CacheConfig.xml -Dtangosol.coherence.edition=EE -cp "%coherence_home%\lib\coherence.jar";"%coherence_home%\applib\projectLoader.jar";"%coherence_home%\applib\CoherenceWork.jar" com.tangosol.net.DefaultCacheServer %1
goto exit
:instructions
echo Usage:
echo   ^<coherence_home^>\bin\cache-server.cmd
goto exit

:exit
endlocal
@echo on

 
4 Create an Override file for the Proxy  CacheProxyOverride.xml
 
<?xml version="1.0" ?>
<coherence xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
      xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
      <cluster-config>
          <member-identity>
            <cluster-name system-property="tangosol.coherence.cluster">DataCluster</cluster-name>
            <process-name system-property="tangosol.coherence.process">CoherenceCacheServer</process-name>
           <role-name system-property="tangosol.coherence.role">CacheProxy</role-name>
          </member-identity>
         <unicast-listener>
             <address system-property="tangosol.coherence.localhost">localhost</address>
             <port system-property="tangosol.coherence.localport">8088</port>            
             <port-auto-adjust system-property="tangosol.coherence.localport.adjust">true</port-auto-adjust>        
         </unicast-listener>        
  </cluster-config>
      <logging-config>
        <destination system-property="tangosol.coherence.log">stdout</destination>       
        <severity-level system-property="tangosol.coherence.log.level">9</severity-level>     
      </logging-config>
</coherence>

 
5. Create a cache config for the proxy. ProxyConfig1.xml
 
<?xml version="1.0" ?>
<cache-config xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"
                      xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
                      xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-cache-config coherence-cache-config.xsd">
            <scope-name>OneSystemScope</scope-name>
            <defaults>
              <serializer>pof</serializer>
            </defaults>
            <caching-scheme-mapping>
                <cache-mapping>
                    <cache-name>mycache</cache-name>
                    <scheme-name>dist-default</scheme-name>
                    <init-params>
                      <init-param>
                        <param-name>size-limit</param-name>
                        <param-value>5000</param-value>
                      </init-param>
                    </init-params>
                </cache-mapping>           
            </caching-scheme-mapping>
            <caching-schemes>
            <distributed-scheme>
                  <scheme-name>dist-default</scheme-name>
                  <service-name>DistributedCache</service-name>
                  <thread-count>5</thread-count>
                  <backup-count>1</backup-count>            
                  <backup-storage>
                    <type>file-mapped</type>
                    <initial-size>1M</initial-size>
                    <maximum-size>5G</maximum-size>
                    <directory>/software/CoherenceInsall/coherence/backup</directory>
                  </backup-storage>
                  <backing-map-scheme>               
                    <overflow-scheme>
                      <scheme-name>LocalMemoryWithDiskOverflow</scheme-name>
                      <front-scheme>                  
                        <local-scheme>                     
                            <high-units>300000</high-units>
                        </local-scheme>
                      </front-scheme>
                      <back-scheme>
                        <external-scheme>
                          <scheme-name>DiskScheme</scheme-name>
                          <nio-file-manager>
                            <initial-size>1MB</initial-size>
                            <maximum-size>1024MB</maximum-size>
                            <directory>/software/CoherenceInstall/coherence/backup</directory>
                          </nio-file-manager>
                        </external-scheme>
                      </back-scheme>
                    </overflow-scheme>
                  </backing-map-scheme>
                  <autostart>true</autostart>
            </distributed-scheme>   
            <proxy-scheme>                 
                  <service-name>ExtendTcpProxyService</service-name>
                  <thread-count>5</thread-count>
                  <acceptor-config>
                    <tcp-acceptor>
                     <local-address>
                       <address>localhost</address>
                       <port>9098</port>
                     </local-address>
                    </tcp-acceptor>
                    <serializer>                     
                    <instance>                     
                    <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
                    <init-params>                       
                        <init-param>                         
                            <param-type>String</param-type>                         
                            <param-value>pof-config.xml</param-value>
                        </init-param>
                    </init-params>
                    </instance>
                    </serializer>                 
                    </acceptor-config>
                    <proxy-config>
                        <cache-service-proxy>
                            <enabled>true</enabled>
                        </cache-service-proxy>
                        <invocation-service-proxy>
                            <enabled>true</enabled>
                        </invocation-service-proxy>
                    </proxy-config>
                    <autostart>true</autostart>               
        </proxy-scheme> 
        <invocation-scheme>
          <scheme-name>InvocationService</scheme-name>
          <service-name>InvocationService</service-name>
          <thread-count>5</thread-count>
          <autostart>true</autostart>
        </invocation-scheme>       
        </caching-schemes>       
</cache-config>
               
 
6. Create a command to start the proxy server cacheproxy1.cmd

@echo off
@
@rem This will start a cache server
@
setlocal

:config
@rem specify the Coherence installation directory
set coherence_home=%~dp0\..

@rem specify the JVM heap size
set memory=512m

:start
if not exist "%coherence_home%\lib\coherence.jar" goto instructions

if "%java_home%"=="" (set java_exec=java) else (set java_exec=%java_home%\bin\java)
:launch
if "%1"=="-jmx" (
    set jmxproperties=-Dcom.sun.management.jmxremote -Dtangosol.coherence.management=all -Dcom.sun.management.jmxremote.port=7772 -Dtangosol.coherence.management.remote=true
    shift 
)   

set java_opts=-Xms%memory% -Xmx%memory% %jmxproperties%
%java_exec% -server -showversion %java_opts% -Dtangosol.coherence.override=CacheProxyOverride.xml -Dtangosol.coherence.distributed.localstorage=false -Dtangosol.coherence.cacheconfig=ProxyConfig1.xml -Dtangosol.coherence.edition=EE -cp "%coherence_home%\lib\coherence.jar";"%coherence_home%\applib\CoherenceWork.jar";"%coherence_home%\applib\projectLoader.jar" com.tangosol.net.DefaultCacheServer %1
goto exit
:instructions
echo Usage:
echo   ^<coherence_home^>\bin\cache-server.cmd
goto exit

:exit
endlocal
@echo on



Now we have all the cache cluster configurations ready. We now need to write the client programs and configurations

7. Create a pof-config.xml place a copy of this in the run directory
<?xml version="1.0" encoding="UTF-8" ?>
<pof-config xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"
            xmlns="http://xmlns.oracle.com/coherence/coherence-pof-config"
            xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-pof-config/1.1/coherence-pof-config.xsd">
  <user-type-list>
    <include>coherence-pof-config.xml</include>
    <user-type>
      <type-id>1001</type-id>
      <class-name>com.coher.dto.OrderBean</class-name>
      <serializer>
        <class-name>com.tangosol.io.pof.PofAnnotationSerializer</class-name>
        <init-params>
          <init-param>
            <param-type>int</param-type>
            <param-value>{type-id}</param-value>
          </init-param>
          <init-param>
            <param-type>java.lang.Class</param-type>
            <param-value>{class}</param-value>
          </init-param>
          <init-param>
            <param-type>boolean</param-type>
            <param-value>true</param-value>
          </init-param>
        </init-params>
      </serializer>
    </user-type>
    <user-type>
      <type-id>1002</type-id>
      <class-name>com.coher.loader.LoaderAgent</class-name>
      <serializer>
        <class-name>com.tangosol.io.pof.PofAnnotationSerializer</class-name>
        <init-params>
          <init-param>
            <param-type>int</param-type>
            <param-value>{type-id}</param-value>
          </init-param>
          <init-param>
            <param-type>java.lang.Class</param-type>
            <param-value>{class}</param-value>
          </init-param>
          <init-param>
            <param-type>boolean</param-type>
            <param-value>true</param-value>
          </init-param>
        </init-params>
      </serializer>
    </user-type>
  </user-type-list>
  <allow-interfaces>true</allow-interfaces>
  <allow-subclasses>true</allow-subclasses>
</pof-config>


8. Create a local cache config

<?xml version="1.0" ?>
<cache-config xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"
              xmlns="http://xmlns.oracle.com/coherence/coherence-cache-config"
              xsi:schemaLocation="coherence-cache-config.xsd">
  <scope-name>OneSystemScope</scope-name>
  <defaults>
    <serializer>pof</serializer>
  </defaults>
  <caching-scheme-mapping>
    <cache-mapping>
      <cache-name>mycache</cache-name>
      <scheme-name>extend-dist</scheme-name>
      <init-params>
        <init-param>
          <param-name>size-limit</param-name>
          <param-value>5000</param-value>
        </init-param>
      </init-params>
    </cache-mapping>
  </caching-scheme-mapping>
  <caching-schemes>
    <remote-cache-scheme>
      <scheme-name>extend-dist</scheme-name>
      <service-name>ExtendTcpCacheService</service-name>
      <initiator-config>
        <tcp-initiator>
          <remote-addresses>
            <!-- include a socket-address for each distributed tier cluster node -->
            <socket-address>
              <address>10.10.2.106</address>
              <port>9098</port>
            </socket-address>
            <socket-address>
              <address>10.10.2.106</address>
              <port>9099</port>
            </socket-address>
          </remote-addresses>
          <connect-timeout>10s</connect-timeout>
        </tcp-initiator>
        <outgoing-message-handler>
          <request-timeout>60s</request-timeout>
        </outgoing-message-handler>
        <serializer>
          <instance>
            <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
            <init-params>
              <init-param>
                <param-type>java.lang.String</param-type>
                <param-value>pof-config.xml</param-value>
              </init-param>
            </init-params>
          </instance>
        </serializer>
      </initiator-config>
    </remote-cache-scheme>
    <!-- the remote invocation scheme configuration defines the remote connection
            interface between the front and back caches for the near cache. It enables
            the near cache to perform operations as well as get update events. -->
    <remote-invocation-scheme>
      <scheme-name>extend-invocation</scheme-name>
      <service-name>ExtendTcpInvocationService</service-name>
      <initiator-config>
        <tcp-initiator>
          <remote-addresses>
            <!-- include a socket-address for each distributed tier cluster node -->
            <socket-address>
              <address>10.10.2.106</address>
              <port>9098</port>
            </socket-address>
            <socket-address>
              <address>10.10.2.106</address>
              <port>9099</port>
            </socket-address>
          </remote-addresses>
          <connect-timeout>60s</connect-timeout>
        </tcp-initiator>
        <outgoing-message-handler>
          <request-timeout>20s</request-timeout>
        </outgoing-message-handler>
        <serializer>
          <instance>
            <class-name>com.tangosol.io.pof.ConfigurablePofContext</class-name>
            <init-params>
              <init-param>
                <param-type>java.lang.String</param-type>
                <param-value>pof-config.xml</param-value>
              </init-param>
            </init-params>
          </instance>
        </serializer>
      </initiator-config>
    </remote-invocation-scheme>
    <invocation-scheme>
      <scheme-name>InvocationService</scheme-name>
      <service-name>InvocationService</service-name>
      <thread-count>5</thread-count>
      <autostart>true</autostart>
    </invocation-scheme>
  </caching-schemes>
</cache-config>  
            

9. Create a local Cache Override file

<?xml version="1.0" ?>
<coherence xmlns:xsi="
http://www.w3.org/2001/XMLSchema-instance"
           xmlns="http://xmlns.oracle.com/coherence/coherence-operational-config"
           xsi:schemaLocation="http://xmlns.oracle.com/coherence/coherence-operational-config coherence-operational-config.xsd">
  <cluster-config>
    <member-identity>
      <cluster-name system-property="tangosol.coherence.cluster">DataCluster</cluster-name>
      <process-name system-property="tangosol.coherence.process">CoherenceCacheServer</process-name>
      <role-name system-property="tangosol.coherence.role">CacheClient</role-name>
    </member-identity>
  </cluster-config>
  <logging-config>
    <destination system-property="tangosol.coherence.log">stdout</destination>
    <severity-level system-property="tangosol.coherence.log.level">9</severity-level>
  </logging-config>
</coherence>


10 Creating Invokable class that extends AbstractInvocable

package com.coher.loader;
import com.tangosol.io.pof.annotation.Portable;
import com.tangosol.io.pof.annotation.PortableProperty;
import com.tangosol.net.AbstractInvocable;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.NamedCache;
import java.io.Serializable;
@Portable
public class LoaderAgent extends AbstractInvocable implements  Serializable
{
  @PortableProperty
  private String mFromOrderNumber;

  @PortableProperty  private String mToOrderNumber;
  public LoaderAgent()
  {
    super();
  }
  public LoaderAgent(String pFromOrderNumber, String pToOrderNumber)
  {
    super();
    this.mFromOrderNumber = pFromOrderNumber;
    this.mToOrderNumber = pToOrderNumber;
  }
public void run()
  {
    NamedCache cache = CacheFactory.getCache("mycache");
    // Write logic to query and load caches here
    System.out.println(" ----------------------------------------------------------------------- ");
    System.out.println("Loading Cache for Orders from " +mFromOrderNumber+ " to "+ mToOrderNumber);   
    System.out.println(" ----------------------------------------------------------------------- ");
  }
  public void setFromOrderNumber(String pFromOrderNumber)
  {
    this.mFromOrderNumber = pFromOrderNumber;
  }
  public String getFromOrderNumber()
  {
    return mFromOrderNumber;
  }
  public void setToOrderNumber(String pToOrderNumber)
  {
    this.mToOrderNumber = pToOrderNumber;
  }
  public String getToOrderNumber()
  {
    return mToOrderNumber;
  }
}

11. Create a class that can observe Execution that implements InvocationObserver

package com.coher.loader;
import com.tangosol.net.InvocationObserver;
import com.tangosol.net.Member;

public class LoaderObserver
  implements InvocationObserver
{
  public LoaderObserver()
  {
    super();
  }

  public void memberCompleted(Member member, Object object)
  {
    System.out.println("Received membercompleted notification from Member: " + member + ", Result: " + object);
  }

  public void memberFailed(Member member, Throwable throwable)
  {
    System.out.println("Received member failed notification from Member: " + member.getId() + ", exception " + throwable);
  }

  public void memberLeft(Member member)
  {
    System.out.println("Received memberleft notification from Member: " + member.getId());
  }

  public void invocationCompleted()
  {
    System.out.println("Received invocationcompleted notification");
  }
}


12. Create a class that can distribute load 

package com.coher.loader;
import com.tangosol.net.CacheFactory;
import com.tangosol.net.InvocationService;
import com.tangosol.net.Member;

import java.util.Collections;
import java.util.Iterator;
import java.util.Set;

public class LoaderAgentTest
{
  public LoaderAgentTest()
  {
    super();
  }

  public static void main(String[] args)
  {
    InvocationService service = (InvocationService) CacheFactory.getService("InvocationService");
    Set setMembers = service.getInfo().getServiceMembers();
    int counter = 0;
    for (Iterator it = setMembers.iterator() ; it.hasNext(); )
    {
      Member m = (Member)it.next();
      System.out.println(m.getRoleName());
      if (m.getRoleName().equals("CacheServer"))
      {
        LoaderAgent task = new LoaderAgent(counter + 1+"", counter+500+"");
        service.execute(task, Collections.singleton(m),
                        new LoaderObserver());
      }
      counter = counter+500;
    }
  }

}
 
 
 
 
Start two instances of coherence cache servers(by double clicking on cacheserver.cmd twice) and one instance of the proxy (cacheproxy1.cmd)
 
 
image
 
Below are the links to project Source and configuration files
Project Source zip
Run folder zip








































































No comments:

Post a Comment