View Javadoc
1   /*
2    * Copyright (C) 2003-2012 David E. Berry
3    *
4    * This library is free software; you can redistribute it and/or
5    * modify it under the terms of the GNU Lesser General Public
6    * License as published by the Free Software Foundation; either
7    * version 2.1 of the License, or (at your option) any later version.
8    *
9    * This library is distributed in the hope that it will be useful,
10   * but WITHOUT ANY WARRANTY; without even the implied warranty of
11   * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
12   * Lesser General Public License for more details.
13   *
14   * You should have received a copy of the GNU Lesser General Public
15   * License along with this library; if not, write to the Free Software
16   * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  02110-1301  USA
17   *
18   * A copy of the GNU Lesser General Public License may also be found at
19   * http://www.gnu.org/licenses/lgpl.txt
20   */
21  package org.synchronoss.cpo.cassandra.config;
22  
23  import com.datastax.driver.core.*;
24  import com.datastax.driver.core.policies.LoadBalancingPolicy;
25  import com.datastax.driver.core.policies.ReconnectionPolicy;
26  import com.datastax.driver.core.policies.RetryPolicy;
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  import org.synchronoss.cpo.CpoAdapter;
30  import org.synchronoss.cpo.CpoException;
31  import org.synchronoss.cpo.cassandra.CassandraCpoAdapter;
32  import org.synchronoss.cpo.cassandra.ClusterDataSourceInfo;
33  import org.synchronoss.cpo.cassandra.cpoCassandraConfig.CtCassandraConfig;
34  import org.synchronoss.cpo.cassandra.cpoCassandraConfig.CtCassandraReadWriteConfig;
35  import org.synchronoss.cpo.cassandra.meta.CassandraCpoMetaDescriptor;
36  import org.synchronoss.cpo.config.CpoConfigProcessor;
37  import org.synchronoss.cpo.core.cpoCoreConfig.CtDataSourceConfig;
38  import org.synchronoss.cpo.meta.CpoMetaDescriptor;
39  
40  import java.util.ArrayList;
41  
42  /**
43   * CassandraCpoConfigProcessor processes the datasource configuration file for cassandra. It pulls out all the information needed to configure a cluster for use
44   * within the application.
45   *
46   * User: dberry
47   * Date: 9/10/13
48   * Time: 08:20 AM
49   * To change this template use File | Settings | File Templates.
50   */
51  public class CassandraCpoConfigProcessor implements CpoConfigProcessor {
52    private static final Logger logger = LoggerFactory.getLogger(CassandraCpoConfigProcessor.class);
53  
54    @Override
55    public CpoAdapter processCpoConfig(CtDataSourceConfig cpoConfig) throws CpoException {
56      CpoAdapter cpoAdapter = null;
57  
58      if (cpoConfig == null || !(cpoConfig instanceof CtCassandraConfig)) {
59        throw new CpoException("Invalid Jdbc Configuration Information");
60      }
61  
62      CtCassandraConfig cassandraConfig = (CtCassandraConfig) cpoConfig;
63  
64      CassandraCpoMetaDescriptor metaDescriptor = (CassandraCpoMetaDescriptor) CpoMetaDescriptor.getInstance(cassandraConfig.getMetaDescriptorName());
65  
66      // build the cluster information
67      if (cassandraConfig.isSetReadWriteConfig()) {
68        ClusterDataSourceInfo clusterInfo = buildDataSourceInfo(cassandraConfig.getName(), cassandraConfig.getReadWriteConfig());
69        cpoAdapter = CassandraCpoAdapter.getInstance(metaDescriptor, clusterInfo);
70      } else {
71        ClusterDataSourceInfo readClusterInfo = buildDataSourceInfo(cassandraConfig.getName(), cassandraConfig.getReadConfig());
72        ClusterDataSourceInfo writeClusterInfo = buildDataSourceInfo(cassandraConfig.getName(), cassandraConfig.getWriteConfig());
73        cpoAdapter = CassandraCpoAdapter.getInstance(metaDescriptor, writeClusterInfo, readClusterInfo);
74      }
75      logger.debug("Adapter Datasourcename ="+cpoAdapter.getDataSourceName());
76  
77      return cpoAdapter;
78  
79    }
80  
81    private ClusterDataSourceInfo buildDataSourceInfo(String dataConfigName, CtCassandraReadWriteConfig readWriteConfig) throws CpoException {
82      ClusterDataSourceInfo clusterInfo = new ClusterDataSourceInfo(dataConfigName, readWriteConfig.getKeySpace(), readWriteConfig.getContactPointArray());
83  
84      // add clusterName
85      if(readWriteConfig.isSetClusterName())
86        clusterInfo.setClusterName(readWriteConfig.getClusterName());
87  
88      // add port
89      if (readWriteConfig.isSetPort())
90        clusterInfo.setPort(readWriteConfig.getPort());
91  
92      // add loadBalancing
93      if (readWriteConfig.isSetLoadBalancingPolicy()) {
94        clusterInfo.setLoadBalancingPolicy(new ConfigInstantiator<LoadBalancingPolicy>().instantiate(readWriteConfig.getLoadBalancingPolicy()));
95      }
96  
97      // add reconnectionPolicy
98      if (readWriteConfig.isSetReconnectionPolicy())
99        clusterInfo.setReconnectionPolicy(new ConfigInstantiator<ReconnectionPolicy>().instantiate(readWriteConfig.getReconnectionPolicy()));
100 
101     // add retryPolicy
102     if (readWriteConfig.isSetRetryPolicy())
103       clusterInfo.setRetryPolicy(new ConfigInstantiator<RetryPolicy>().instantiate(readWriteConfig.getRetryPolicy()));
104 
105     // add credentials
106     if (readWriteConfig.isSetCredentials()) {
107       clusterInfo.setHasCredentials(true);
108       clusterInfo.setUserName(readWriteConfig.getCredentials().getUser());
109       clusterInfo.setPassword(readWriteConfig.getCredentials().getUser());
110     }
111 
112     // add AuthProvider
113 //    if (readWriteConfig.isSetAuthProvider())
114 //      clusterInfo.setAuthProvider(new ConfigInstantiator<AuthProvider>().instantiate(readWriteConfig.getAuthProvider()));
115 
116     // add Compression
117     if (readWriteConfig.isSetCompression())
118       clusterInfo.setCompressionType(ProtocolOptions.Compression.valueOf(readWriteConfig.getCompression().toString()));
119 
120     // add Metrics
121     if (readWriteConfig.isSetMetrics())
122       clusterInfo.setUseMetrics(readWriteConfig.getMetrics());
123 
124     // add SSL
125     if (readWriteConfig.isSetSslOptions())
126       clusterInfo.setSslOptions(new ConfigInstantiator<SSLOptions>().instantiate(readWriteConfig.getSslOptions()));
127 
128     // add Listeners
129     if (readWriteConfig.getInitialListenersArray().length>0){
130       ArrayList<Host.StateListener> listeners = new ArrayList<>();
131       for (String s : readWriteConfig.getInitialListenersArray()) {
132         listeners.add(new ConfigInstantiator<Host.StateListener>().instantiate(s));
133       }
134       clusterInfo.setListeners(listeners);
135     }
136 
137     // add JMX Reporting
138     if (readWriteConfig.isSetJmxReporting())
139       clusterInfo.setUseJmxReporting(readWriteConfig.getJmxReporting());
140 
141     // add pooling options
142     if (readWriteConfig.isSetPoolingOptions())
143       clusterInfo.setPoolingOptions(new ConfigInstantiator<PoolingOptions>().instantiate(readWriteConfig.getPoolingOptions()));
144 
145     // add socket options
146     if (readWriteConfig.isSetSocketOptions())
147       clusterInfo.setSocketOptions(new ConfigInstantiator<SocketOptions>().instantiate(readWriteConfig.getSocketOptions()));
148 
149     // TODO: Add back in when add query options
150 //    if (readWriteConfig.isSetQueryOptions())
151 //      clusterInfo.setQueryOptions(new ConfigInstantiator<QueryOptions>().instantiate(readWriteConfig.getQueryOptions()));
152 
153 
154     logger.debug("Created DataSourceInfo: " + clusterInfo);
155     return clusterInfo;
156   }
157 
158 }