Changeset 521:24519f17c20a
- Timestamp:
- 11/12/11 23:26:46 (6 months ago)
- Branch:
- default
- Location:
- kraken-rpc
- Files:
-
- 5 edited
-
pom.xml (modified) (3 diffs)
-
src/main/java/org/krakenapps/rpc/RpcAgent.java (modified) (1 diff)
-
src/main/java/org/krakenapps/rpc/impl/RpcAgentImpl.java (modified) (11 diffs)
-
src/main/java/org/krakenapps/rpc/impl/RpcHandler.java (modified) (9 diffs)
-
src/main/java/org/krakenapps/rpc/impl/RpcScript.java (modified) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
kraken-rpc/pom.xml
r455 r521 11 11 <groupId>org.krakenapps</groupId> 12 12 <artifactId>kraken-rpc</artifactId> 13 <version>1. 2.3</version>13 <version>1.3.0</version> 14 14 <packaging>bundle</packaging> 15 15 <name>Kraken RPC</name> … … 54 54 <groupId>org.krakenapps</groupId> 55 55 <artifactId>kraken-api</artifactId> 56 <version>1. 6.0</version>56 <version>1.9.1</version> 57 57 </dependency> 58 58 <dependency> … … 81 81 <version>2.1.0</version> 82 82 </dependency> 83 <dependency> 84 <groupId>org.krakenapps</groupId> 85 <artifactId>kraken-confdb</artifactId> 86 <version>0.3.0</version> 87 </dependency> 83 88 </dependencies> 84 89 </project> -
kraken-rpc/src/main/java/org/krakenapps/rpc/RpcAgent.java
r386 r521 5 5 public interface RpcAgent { 6 6 String getGuid(); 7 8 Collection<RpcBindingProperties> getBindings(); 9 10 void open(RpcBindingProperties props); 11 12 void close(RpcBindingProperties props); 7 13 8 14 RpcConnection connect(RpcConnectionProperties props); -
kraken-rpc/src/main/java/org/krakenapps/rpc/impl/RpcAgentImpl.java
r386 r521 3 3 import java.net.InetSocketAddress; 4 4 import java.security.SecureRandom; 5 import java.util.ArrayList; 5 6 import java.util.Collection; 6 7 import java.util.UUID; 8 import java.util.concurrent.ConcurrentHashMap; 9 import java.util.concurrent.ConcurrentMap; 7 10 import java.util.concurrent.Executors; 8 11 … … 28 31 import org.jboss.netty.handler.ssl.SslHandler; 29 32 import org.krakenapps.api.KeyStoreManager; 33 import org.krakenapps.confdb.Config; 34 import org.krakenapps.confdb.ConfigDatabase; 35 import org.krakenapps.confdb.ConfigIterator; 36 import org.krakenapps.confdb.ConfigService; 37 import org.krakenapps.confdb.Predicate; 38 import org.krakenapps.confdb.Predicates; 39 import org.krakenapps.rpc.RpcBindingProperties; 30 40 import org.krakenapps.rpc.RpcClient; 31 41 import org.krakenapps.rpc.RpcConnection; … … 49 59 private RpcPeerRegistry peerRegistry; 50 60 51 private Channel listener;52 private Channel sslListener;53 54 private int plainPort = 7139;55 private int sslPort = 7140;56 61 private RpcHandler handler; 57 62 private RpcServiceTracker tracker; 58 63 64 private ConcurrentMap<RpcBindingProperties, Channel> bindings; 65 59 66 @Requires 60 67 private KeyStoreManager keyStoreManager; 68 69 @Requires 70 private ConfigService conf; 61 71 62 72 public RpcAgentImpl(BundleContext bc) { … … 65 75 handler = new RpcHandler(getGuid(), peerRegistry); 66 76 tracker = new RpcServiceTracker(bc, handler); 77 bindings = new ConcurrentHashMap<RpcBindingProperties, Channel>(); 67 78 } 68 79 … … 72 83 handler.start(); 73 84 bc.addServiceListener(tracker); 74 bind(plainPort); 75 bindSsl(sslPort, "rpc-ca", "rpc-agent"); 85 86 // open configured bindings 87 ConfigDatabase db = conf.ensureDatabase("kraken-rpc"); 88 ConfigIterator it = db.findAll(RpcBindingProperties.class); 89 try { 90 while (it.hasNext()) { 91 Config c = it.next(); 92 RpcBindingProperties props = c.getDocument(RpcBindingProperties.class); 93 if (props.getKeyAlias() != null && props.getTrustAlias() != null) 94 bindSsl(props); 95 else 96 bind(props); 97 } 98 } finally { 99 it.close(); 100 } 76 101 77 102 // register all auto-wiring RPC services. … … 85 110 @Invalidate 86 111 public void stop() { 87 unbind(); 88 unbindSsl(); 112 for (RpcBindingProperties props : bindings.keySet()) 113 unbind(props); 114 89 115 bc.removeServiceListener(tracker); 90 116 handler.stop(); … … 115 141 116 142 @Override 143 public Collection<RpcBindingProperties> getBindings() { 144 return new ArrayList<RpcBindingProperties>(bindings.keySet()); 145 } 146 147 @Override 148 public void open(RpcBindingProperties props) { 149 if (bindings.containsKey(props)) 150 throw new IllegalStateException("already opened: " + props); 151 152 if (props.getKeyAlias() != null && props.getTrustAlias() != null) 153 bindSsl(props); 154 else 155 bind(props); 156 157 ConfigDatabase db = conf.ensureDatabase("kraken-rpc"); 158 db.add(props, "kraken-rpc", "opened " + props); 159 } 160 161 @Override 162 public void close(RpcBindingProperties props) { 163 ConfigDatabase db = conf.ensureDatabase("kraken-rpc"); 164 Predicate p = Predicates.and(Predicates.field("addr", props.getHost()), 165 Predicates.field("port", props.getPort())); 166 167 Config c = db.findOne(RpcBindingProperties.class, p); 168 if (c != null) 169 db.remove(c, false, "kraken-rpc", "closed " + props); 170 171 unbind(props); 172 } 173 174 @Override 117 175 public RpcConnection connectSsl(RpcConnectionProperties props) { 118 176 RpcClient client = new RpcClient(handler); … … 126 184 } 127 185 128 public void bindSsl(int port, final String trustStoreAlias, final String keyStoreAlias) throws Exception { 129 if (sslListener != null) { 130 logger.warn("kraken-rpc: rpc port already opened, {}", listener.getRemoteAddress()); 131 return; 132 } 133 186 private Channel bindSsl(RpcBindingProperties props) { 134 187 ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), 135 188 Executors.newCachedThreadPool()); 189 190 final String keyAlias = props.getKeyAlias(); 191 final String trustAlias = props.getTrustAlias(); 136 192 137 193 ServerBootstrap bootstrap = new ServerBootstrap(factory); … … 146 202 147 203 // ssl 148 TrustManagerFactory tmf = keyStoreManager.getTrustManagerFactory(trust StoreAlias, "SunX509");149 KeyManagerFactory kmf = keyStoreManager.getKeyManagerFactory(key StoreAlias, "SunX509");204 TrustManagerFactory tmf = keyStoreManager.getTrustManagerFactory(trustAlias, "SunX509"); 205 KeyManagerFactory kmf = keyStoreManager.getKeyManagerFactory(keyAlias, "SunX509"); 150 206 151 207 TrustManager[] trustManagers = null; … … 174 230 }); 175 231 176 sslListener = bootstrap.bind(new InetSocketAddress(port)); 177 178 logger.info("kraken-rpc: {} ssl port opened", port); 179 } 180 181 public void bind(int port) throws Exception { 182 if (listener != null) { 183 logger.warn("kraken-rpc: rpc port already opened, {}", listener.getRemoteAddress()); 184 return; 185 } 186 232 InetSocketAddress address = new InetSocketAddress(props.getHost(), props.getPort()); 233 Channel channel = bootstrap.bind(address); 234 bindings.put(props, channel); 235 236 logger.info("kraken-rpc: {} ssl port opened", address); 237 return channel; 238 } 239 240 private Channel bind(RpcBindingProperties props) { 187 241 ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), 188 242 Executors.newCachedThreadPool()); … … 199 253 bootstrap.setOption("child.keepAlive", true); 200 254 201 listener = bootstrap.bind(new InetSocketAddress(port)); 202 203 logger.info("kraken-rpc: {} port opened", port); 204 } 205 206 public void unbind() { 207 if (listener == null) 255 InetSocketAddress address = new InetSocketAddress(props.getHost(), props.getPort()); 256 257 Channel channel = bootstrap.bind(address); 258 bindings.put(props, channel); 259 260 logger.info("kraken-rpc: {} port opened", address); 261 return channel; 262 } 263 264 private void unbind(RpcBindingProperties props) { 265 Channel channel = bindings.remove(props); 266 if (channel == null) 208 267 return; 209 268 210 logger.info("kraken-rpc: unbinding listen port"); 211 listener.unbind(); 212 listener.close().awaitUninterruptibly(); 213 listener = null; 214 } 215 216 public void unbindSsl() { 217 if (sslListener == null) 218 return; 219 220 logger.info("kraken-rpc: unbinding ssl listen port"); 221 sslListener.unbind(); 222 sslListener.close().awaitUninterruptibly(); 223 sslListener = null; 269 logger.info("kraken-rpc: unbinding [{}]", props); 270 channel.unbind(); 271 channel.close().awaitUninterruptibly(); 224 272 } 225 273 -
kraken-rpc/src/main/java/org/krakenapps/rpc/impl/RpcHandler.java
r386 r521 80 80 81 81 public void stop() { 82 executor.shutdown(); 83 executor = null; 82 if (executor != null) { 83 executor.shutdown(); 84 executor = null; 85 } 84 86 85 87 for (RpcConnection conn : connMap.values()) … … 116 118 // register handler to connection 117 119 newConnection.addListener(this); 118 120 119 121 // add to connection 120 122 connMap.put(channel.getId(), newConnection); 121 123 122 logger.info("kraken-rpc: [{}] {} connected, remote={}", new Object[] { newConnection.getId(),123 new Connection.isClient() ? "client" : "server", addr });124 logger.info("kraken-rpc: [{}] {} connected, remote={}", 125 new Object[] { newConnection.getId(), newConnection.isClient() ? "client" : "server", addr }); 124 126 125 127 // ssl handshake … … 153 155 154 156 String peerCommonName = peerCert.getSubjectDN().getName(); 155 logger.info("kraken-rpc: new peer certificate subject={}, remote={}", peerCommonName, channel.getRemoteAddress()); 157 logger.info("kraken-rpc: new peer certificate subject={}, remote={}", peerCommonName, 158 channel.getRemoteAddress()); 156 159 157 160 if (executor != null) … … 190 193 } 191 194 192 private RpcConnection initializeConnection(Channel channel, RpcConnectionProperties props) throws InterruptedException { 195 private RpcConnection initializeConnection(Channel channel, RpcConnectionProperties props) 196 throws InterruptedException { 193 197 RpcConnectionImpl newConnection = null; 194 198 … … 295 299 296 300 if (logger.isTraceEnabled()) 297 logger.trace("kraken-rpc: msg received - connection: {}, session: {}, message id: {}, type: {}, method: {}", 301 logger.trace( 302 "kraken-rpc: msg received - connection: {}, session: {}, message id: {}, type: {}, method: {}", 298 303 new Object[] { conn.getId(), sessionId, id, type, methodName }); 299 304 300 305 RpcSession session = conn.findSession(sessionId); 301 306 if (session == null) { 302 logger.warn("kraken-rpc: session {} not found, connection={}, peer={}", new Object[] { sessionId, conn.getId(),303 conn.getRemoteAddress() });307 logger.warn("kraken-rpc: session {} not found, connection={}, peer={}", 308 new Object[] { sessionId, conn.getId(), conn.getRemoteAddress() }); 304 309 return; 305 310 } … … 323 328 channel.write(resp); 324 329 325 logger.trace("kraken-rpc: return for [id={}, ret={}, session={}, method={}]", new Object[] { newId, id,326 session.getId(), methodName });330 logger.trace("kraken-rpc: return for [id={}, ret={}, session={}, method={}]", new Object[] { newId, 331 id, session.getId(), methodName }); 327 332 } catch (Throwable t) { 328 333 if (t.getCause() != null) … … 414 419 String methodName = msg.getString("method"); 415 420 416 logger.trace("kraken-rpc: received rpc-call, connection={}, session={}, method={}", new Object[] { connectionId,417 sessionId, methodName });421 logger.trace("kraken-rpc: received rpc-call, connection={}, session={}, method={}", new Object[] { 422 connectionId, sessionId, methodName }); 418 423 419 424 Method method = binding.getMethod(methodName); … … 432 437 if (logger.isTraceEnabled()) 433 438 traceMethodCall(connectionId, sessionId, methodName, args); 434 439 435 440 return method.invoke(binding.getService(), args); 436 441 } … … 449 454 } 450 455 451 logger.trace("kraken rpc: method call, conn [{}], session [{}], method [{}], params [{}]", new Object[] { conn, session,452 method, sb.toString() });456 logger.trace("kraken rpc: method call, conn [{}], session [{}], method [{}], params [{}]", new Object[] { conn, 457 session, method, sb.toString() }); 453 458 } 454 459 -
kraken-rpc/src/main/java/org/krakenapps/rpc/impl/RpcScript.java
r373 r521 16 16 import org.krakenapps.api.ScriptContext; 17 17 import org.krakenapps.api.ScriptUsage; 18 import org.krakenapps.rpc.RpcBindingProperties; 18 19 import org.krakenapps.rpc.RpcBlockingTable; 19 20 import org.krakenapps.rpc.RpcConnection; … … 43 44 public void setScriptContext(ScriptContext context) { 44 45 this.context = context; 46 } 47 48 public void bindings(String[] args) { 49 context.println("Port Bindings"); 50 context.println("---------------"); 51 for (RpcBindingProperties props : agent.getBindings()) 52 context.println(props); 53 } 54 55 @ScriptUsage(description = "open rpc port", arguments = { 56 @ScriptArgument(name = "port", type = "int", description = "port"), 57 @ScriptArgument(name = "ip", type = "string", description = "bind address", optional = true) }) 58 public void open(String[] args) { 59 RpcBindingProperties props = null; 60 try { 61 props = inputBindingProps(args); 62 agent.open(props); 63 context.println("opened "); 64 } catch (NumberFormatException e) { 65 context.println("invalid port number format"); 66 } catch (Exception e) { 67 context.println(e.getMessage()); 68 logger.error("kraken rpc: cannot bind " + props, e); 69 } 70 } 71 72 @ScriptUsage(description = "open rpc port", arguments = { 73 @ScriptArgument(name = "port", type = "int", description = "port"), 74 @ScriptArgument(name = "key alias", type = "string", description = "key alias"), 75 @ScriptArgument(name = "trust alias", type = "string", description = "trust alias"), 76 @ScriptArgument(name = "ip", type = "string", description = "bind address", optional = true) }) 77 public void openSsl(String[] args) { 78 RpcBindingProperties props = null; 79 try { 80 String keyAlias = args[1]; 81 String trustAlias = args[2]; 82 String ip = args[3]; 83 84 args[1] = ip; 85 args[2] = keyAlias; 86 args[3] = trustAlias; 87 props = inputBindingProps(args); 88 89 agent.open(props); 90 context.println("opened "); 91 } catch (NumberFormatException e) { 92 context.println("invalid port number format"); 93 } catch (Exception e) { 94 context.println(e.getMessage()); 95 logger.error("kraken rpc: cannot bind " + props, e); 96 } 97 } 98 99 public void close(String[] args) { 100 try { 101 RpcBindingProperties props = inputBindingProps(args); 102 if (!agent.getBindings().contains(props)) { 103 context.println("not opened"); 104 return; 105 } 106 107 agent.close(props); 108 } catch (NumberFormatException e) { 109 context.println("invalid port number format"); 110 } catch (Exception e) { 111 context.println(e.getMessage()); 112 } 113 } 114 115 private RpcBindingProperties inputBindingProps(String[] args) { 116 int port = Integer.valueOf(args[0]); 117 if (port < 1 || port > 65535) 118 throw new IllegalArgumentException("port number should be 1~65535"); 119 120 String addr = "0.0.0.0"; 121 if (args.length > 1) 122 addr = args[1]; 123 124 String keyAlias = null; 125 String trustAlias = null; 126 127 if (args.length >= 4) { 128 keyAlias = args[2]; 129 trustAlias = args[3]; 130 } 131 132 return new RpcBindingProperties(addr, port, keyAlias, trustAlias); 45 133 } 46 134 … … 266 354 // listening providers 267 355 @ScriptUsage(description = "list all bound services", arguments = { @ScriptArgument(name = "cid", type = "int", description = "connection id") }) 268 public void bindings(String[] args) {356 public void services(String[] args) { 269 357 int id = Integer.parseInt(args[0]); 270 358 RpcConnection conn = agent.findConnection(id);
Note: See TracChangeset
for help on using the changeset viewer.
