1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21 package org.synchronoss.cpo.cassandra.config;
22
23 import com.datastax.driver.core.*;
24 import com.datastax.driver.core.policies.LoadBalancingPolicy;
25 import com.datastax.driver.core.policies.ReconnectionPolicy;
26 import com.datastax.driver.core.policies.RetryPolicy;
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import org.synchronoss.cpo.CpoAdapter;
30 import org.synchronoss.cpo.CpoException;
31 import org.synchronoss.cpo.cassandra.CassandraCpoAdapter;
32 import org.synchronoss.cpo.cassandra.ClusterDataSourceInfo;
33 import org.synchronoss.cpo.cassandra.cpoCassandraConfig.CtCassandraConfig;
34 import org.synchronoss.cpo.cassandra.cpoCassandraConfig.CtCassandraReadWriteConfig;
35 import org.synchronoss.cpo.cassandra.meta.CassandraCpoMetaDescriptor;
36 import org.synchronoss.cpo.config.CpoConfigProcessor;
37 import org.synchronoss.cpo.core.cpoCoreConfig.CtDataSourceConfig;
38 import org.synchronoss.cpo.meta.CpoMetaDescriptor;
39
40 import java.util.ArrayList;
41
42
43
44
45
46
47
48
49
50
51 public class CassandraCpoConfigProcessor implements CpoConfigProcessor {
52 private static final Logger logger = LoggerFactory.getLogger(CassandraCpoConfigProcessor.class);
53
54 @Override
55 public CpoAdapter processCpoConfig(CtDataSourceConfig cpoConfig) throws CpoException {
56 CpoAdapter cpoAdapter = null;
57
58 if (cpoConfig == null || !(cpoConfig instanceof CtCassandraConfig)) {
59 throw new CpoException("Invalid Jdbc Configuration Information");
60 }
61
62 CtCassandraConfig cassandraConfig = (CtCassandraConfig) cpoConfig;
63
64 CassandraCpoMetaDescriptor metaDescriptor = (CassandraCpoMetaDescriptor) CpoMetaDescriptor.getInstance(cassandraConfig.getMetaDescriptorName());
65
66
67 if (cassandraConfig.isSetReadWriteConfig()) {
68 ClusterDataSourceInfo clusterInfo = buildDataSourceInfo(cassandraConfig.getName(), cassandraConfig.getReadWriteConfig());
69 cpoAdapter = CassandraCpoAdapter.getInstance(metaDescriptor, clusterInfo);
70 } else {
71 ClusterDataSourceInfo readClusterInfo = buildDataSourceInfo(cassandraConfig.getName(), cassandraConfig.getReadConfig());
72 ClusterDataSourceInfo writeClusterInfo = buildDataSourceInfo(cassandraConfig.getName(), cassandraConfig.getWriteConfig());
73 cpoAdapter = CassandraCpoAdapter.getInstance(metaDescriptor, writeClusterInfo, readClusterInfo);
74 }
75 logger.debug("Adapter Datasourcename ="+cpoAdapter.getDataSourceName());
76
77 return cpoAdapter;
78
79 }
80
81 private ClusterDataSourceInfo buildDataSourceInfo(String dataConfigName, CtCassandraReadWriteConfig readWriteConfig) throws CpoException {
82 ClusterDataSourceInfo clusterInfo = new ClusterDataSourceInfo(dataConfigName, readWriteConfig.getKeySpace(), readWriteConfig.getContactPointArray());
83
84
85 if(readWriteConfig.isSetClusterName())
86 clusterInfo.setClusterName(readWriteConfig.getClusterName());
87
88
89 if (readWriteConfig.isSetPort())
90 clusterInfo.setPort(readWriteConfig.getPort());
91
92
93 if (readWriteConfig.isSetLoadBalancingPolicy()) {
94 clusterInfo.setLoadBalancingPolicy(new ConfigInstantiator<LoadBalancingPolicy>().instantiate(readWriteConfig.getLoadBalancingPolicy()));
95 }
96
97
98 if (readWriteConfig.isSetReconnectionPolicy())
99 clusterInfo.setReconnectionPolicy(new ConfigInstantiator<ReconnectionPolicy>().instantiate(readWriteConfig.getReconnectionPolicy()));
100
101
102 if (readWriteConfig.isSetRetryPolicy())
103 clusterInfo.setRetryPolicy(new ConfigInstantiator<RetryPolicy>().instantiate(readWriteConfig.getRetryPolicy()));
104
105
106 if (readWriteConfig.isSetCredentials()) {
107 clusterInfo.setHasCredentials(true);
108 clusterInfo.setUserName(readWriteConfig.getCredentials().getUser());
109 clusterInfo.setPassword(readWriteConfig.getCredentials().getUser());
110 }
111
112
113
114
115
116
117 if (readWriteConfig.isSetCompression())
118 clusterInfo.setCompressionType(ProtocolOptions.Compression.valueOf(readWriteConfig.getCompression().toString()));
119
120
121 if (readWriteConfig.isSetMetrics())
122 clusterInfo.setUseMetrics(readWriteConfig.getMetrics());
123
124
125 if (readWriteConfig.isSetSslOptions())
126 clusterInfo.setSslOptions(new ConfigInstantiator<SSLOptions>().instantiate(readWriteConfig.getSslOptions()));
127
128
129 if (readWriteConfig.getInitialListenersArray().length>0){
130 ArrayList<Host.StateListener> listeners = new ArrayList<>();
131 for (String s : readWriteConfig.getInitialListenersArray()) {
132 listeners.add(new ConfigInstantiator<Host.StateListener>().instantiate(s));
133 }
134 clusterInfo.setListeners(listeners);
135 }
136
137
138 if (readWriteConfig.isSetJmxReporting())
139 clusterInfo.setUseJmxReporting(readWriteConfig.getJmxReporting());
140
141
142 if (readWriteConfig.isSetPoolingOptions())
143 clusterInfo.setPoolingOptions(new ConfigInstantiator<PoolingOptions>().instantiate(readWriteConfig.getPoolingOptions()));
144
145
146 if (readWriteConfig.isSetSocketOptions())
147 clusterInfo.setSocketOptions(new ConfigInstantiator<SocketOptions>().instantiate(readWriteConfig.getSocketOptions()));
148
149
150
151
152
153
154 logger.debug("Created DataSourceInfo: " + clusterInfo);
155 return clusterInfo;
156 }
157
158 }