Changeset 521:24519f17c20a


Ignore:
Timestamp:
11/12/11 23:26:46 (6 months ago)
Author:
xeraph
Branch:
default
Message:
  • added rpc binding control (rpc.bindings, rpc.open, rpc.openSsl, rpc.close commands)
Location:
kraken-rpc
Files:
5 edited

Legend:

Unmodified
Added
Removed
  • kraken-rpc/pom.xml

    r455 r521  
    1111        <groupId>org.krakenapps</groupId> 
    1212        <artifactId>kraken-rpc</artifactId> 
    13         <version>1.2.3</version> 
     13        <version>1.3.0</version> 
    1414        <packaging>bundle</packaging> 
    1515        <name>Kraken RPC</name> 
     
    5454                        <groupId>org.krakenapps</groupId> 
    5555                        <artifactId>kraken-api</artifactId> 
    56                         <version>1.6.0</version> 
     56                        <version>1.9.1</version> 
    5757                </dependency> 
    5858                <dependency> 
     
    8181                        <version>2.1.0</version> 
    8282                </dependency> 
     83                <dependency> 
     84                        <groupId>org.krakenapps</groupId> 
     85                        <artifactId>kraken-confdb</artifactId> 
     86                        <version>0.3.0</version> 
     87                </dependency> 
    8388        </dependencies> 
    8489</project> 
  • kraken-rpc/src/main/java/org/krakenapps/rpc/RpcAgent.java

    r386 r521  
    55public interface RpcAgent { 
    66        String getGuid(); 
     7 
     8        Collection<RpcBindingProperties> getBindings(); 
     9 
     10        void open(RpcBindingProperties props); 
     11 
     12        void close(RpcBindingProperties props); 
    713 
    814        RpcConnection connect(RpcConnectionProperties props); 
  • kraken-rpc/src/main/java/org/krakenapps/rpc/impl/RpcAgentImpl.java

    r386 r521  
    33import java.net.InetSocketAddress; 
    44import java.security.SecureRandom; 
     5import java.util.ArrayList; 
    56import java.util.Collection; 
    67import java.util.UUID; 
     8import java.util.concurrent.ConcurrentHashMap; 
     9import java.util.concurrent.ConcurrentMap; 
    710import java.util.concurrent.Executors; 
    811 
     
    2831import org.jboss.netty.handler.ssl.SslHandler; 
    2932import org.krakenapps.api.KeyStoreManager; 
     33import org.krakenapps.confdb.Config; 
     34import org.krakenapps.confdb.ConfigDatabase; 
     35import org.krakenapps.confdb.ConfigIterator; 
     36import org.krakenapps.confdb.ConfigService; 
     37import org.krakenapps.confdb.Predicate; 
     38import org.krakenapps.confdb.Predicates; 
     39import org.krakenapps.rpc.RpcBindingProperties; 
    3040import org.krakenapps.rpc.RpcClient; 
    3141import org.krakenapps.rpc.RpcConnection; 
     
    4959        private RpcPeerRegistry peerRegistry; 
    5060 
    51         private Channel listener; 
    52         private Channel sslListener; 
    53  
    54         private int plainPort = 7139; 
    55         private int sslPort = 7140; 
    5661        private RpcHandler handler; 
    5762        private RpcServiceTracker tracker; 
    5863 
     64        private ConcurrentMap<RpcBindingProperties, Channel> bindings; 
     65 
    5966        @Requires 
    6067        private KeyStoreManager keyStoreManager; 
     68 
     69        @Requires 
     70        private ConfigService conf; 
    6171 
    6272        public RpcAgentImpl(BundleContext bc) { 
     
    6575                handler = new RpcHandler(getGuid(), peerRegistry); 
    6676                tracker = new RpcServiceTracker(bc, handler); 
     77                bindings = new ConcurrentHashMap<RpcBindingProperties, Channel>(); 
    6778        } 
    6879 
     
    7283                        handler.start(); 
    7384                        bc.addServiceListener(tracker); 
    74                         bind(plainPort); 
    75                         bindSsl(sslPort, "rpc-ca", "rpc-agent"); 
     85 
     86                        // open configured bindings 
     87                        ConfigDatabase db = conf.ensureDatabase("kraken-rpc"); 
     88                        ConfigIterator it = db.findAll(RpcBindingProperties.class); 
     89                        try { 
     90                                while (it.hasNext()) { 
     91                                        Config c = it.next(); 
     92                                        RpcBindingProperties props = c.getDocument(RpcBindingProperties.class); 
     93                                        if (props.getKeyAlias() != null && props.getTrustAlias() != null) 
     94                                                bindSsl(props); 
     95                                        else 
     96                                                bind(props); 
     97                                } 
     98                        } finally { 
     99                                it.close(); 
     100                        } 
    76101 
    77102                        // register all auto-wiring RPC services. 
     
    85110        @Invalidate 
    86111        public void stop() { 
    87                 unbind(); 
    88                 unbindSsl(); 
     112                for (RpcBindingProperties props : bindings.keySet()) 
     113                        unbind(props); 
     114 
    89115                bc.removeServiceListener(tracker); 
    90116                handler.stop(); 
     
    115141 
    116142        @Override 
     143        public Collection<RpcBindingProperties> getBindings() { 
     144                return new ArrayList<RpcBindingProperties>(bindings.keySet()); 
     145        } 
     146 
     147        @Override 
     148        public void open(RpcBindingProperties props) { 
     149                if (bindings.containsKey(props)) 
     150                        throw new IllegalStateException("already opened: " + props); 
     151 
     152                if (props.getKeyAlias() != null && props.getTrustAlias() != null) 
     153                        bindSsl(props); 
     154                else 
     155                        bind(props); 
     156 
     157                ConfigDatabase db = conf.ensureDatabase("kraken-rpc"); 
     158                db.add(props, "kraken-rpc", "opened " + props); 
     159        } 
     160 
     161        @Override 
     162        public void close(RpcBindingProperties props) { 
     163                ConfigDatabase db = conf.ensureDatabase("kraken-rpc"); 
     164                Predicate p = Predicates.and(Predicates.field("addr", props.getHost()), 
     165                                Predicates.field("port", props.getPort())); 
     166 
     167                Config c = db.findOne(RpcBindingProperties.class, p); 
     168                if (c != null) 
     169                        db.remove(c, false, "kraken-rpc", "closed " + props); 
     170 
     171                unbind(props); 
     172        } 
     173 
     174        @Override 
    117175        public RpcConnection connectSsl(RpcConnectionProperties props) { 
    118176                RpcClient client = new RpcClient(handler); 
     
    126184        } 
    127185 
    128         public void bindSsl(int port, final String trustStoreAlias, final String keyStoreAlias) throws Exception { 
    129                 if (sslListener != null) { 
    130                         logger.warn("kraken-rpc: rpc port already opened, {}", listener.getRemoteAddress()); 
    131                         return; 
    132                 } 
    133  
     186        private Channel bindSsl(RpcBindingProperties props) { 
    134187                ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), 
    135188                                Executors.newCachedThreadPool()); 
     189 
     190                final String keyAlias = props.getKeyAlias(); 
     191                final String trustAlias = props.getTrustAlias(); 
    136192 
    137193                ServerBootstrap bootstrap = new ServerBootstrap(factory); 
     
    146202 
    147203                                // ssl 
    148                                 TrustManagerFactory tmf = keyStoreManager.getTrustManagerFactory(trustStoreAlias, "SunX509"); 
    149                                 KeyManagerFactory kmf = keyStoreManager.getKeyManagerFactory(keyStoreAlias, "SunX509"); 
     204                                TrustManagerFactory tmf = keyStoreManager.getTrustManagerFactory(trustAlias, "SunX509"); 
     205                                KeyManagerFactory kmf = keyStoreManager.getKeyManagerFactory(keyAlias, "SunX509"); 
    150206 
    151207                                TrustManager[] trustManagers = null; 
     
    174230                }); 
    175231 
    176                 sslListener = bootstrap.bind(new InetSocketAddress(port)); 
    177  
    178                 logger.info("kraken-rpc: {} ssl port opened", port); 
    179         } 
    180  
    181         public void bind(int port) throws Exception { 
    182                 if (listener != null) { 
    183                         logger.warn("kraken-rpc: rpc port already opened, {}", listener.getRemoteAddress()); 
    184                         return; 
    185                 } 
    186  
     232                InetSocketAddress address = new InetSocketAddress(props.getHost(), props.getPort()); 
     233                Channel channel = bootstrap.bind(address); 
     234                bindings.put(props, channel); 
     235 
     236                logger.info("kraken-rpc: {} ssl port opened", address); 
     237                return channel; 
     238        } 
     239 
     240        private Channel bind(RpcBindingProperties props) { 
    187241                ChannelFactory factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), 
    188242                                Executors.newCachedThreadPool()); 
     
    199253                bootstrap.setOption("child.keepAlive", true); 
    200254 
    201                 listener = bootstrap.bind(new InetSocketAddress(port)); 
    202  
    203                 logger.info("kraken-rpc: {} port opened", port); 
    204         } 
    205  
    206         public void unbind() { 
    207                 if (listener == null) 
     255                InetSocketAddress address = new InetSocketAddress(props.getHost(), props.getPort()); 
     256 
     257                Channel channel = bootstrap.bind(address); 
     258                bindings.put(props, channel); 
     259 
     260                logger.info("kraken-rpc: {} port opened", address); 
     261                return channel; 
     262        } 
     263 
     264        private void unbind(RpcBindingProperties props) { 
     265                Channel channel = bindings.remove(props); 
     266                if (channel == null) 
    208267                        return; 
    209268 
    210                 logger.info("kraken-rpc: unbinding listen port"); 
    211                 listener.unbind(); 
    212                 listener.close().awaitUninterruptibly(); 
    213                 listener = null; 
    214         } 
    215  
    216         public void unbindSsl() { 
    217                 if (sslListener == null) 
    218                         return; 
    219  
    220                 logger.info("kraken-rpc: unbinding ssl listen port"); 
    221                 sslListener.unbind(); 
    222                 sslListener.close().awaitUninterruptibly(); 
    223                 sslListener = null; 
     269                logger.info("kraken-rpc: unbinding [{}]", props); 
     270                channel.unbind(); 
     271                channel.close().awaitUninterruptibly(); 
    224272        } 
    225273 
  • kraken-rpc/src/main/java/org/krakenapps/rpc/impl/RpcHandler.java

    r386 r521  
    8080 
    8181        public void stop() { 
    82                 executor.shutdown(); 
    83                 executor = null; 
     82                if (executor != null) { 
     83                        executor.shutdown(); 
     84                        executor = null; 
     85                } 
    8486 
    8587                for (RpcConnection conn : connMap.values()) 
     
    116118                // register handler to connection 
    117119                newConnection.addListener(this); 
    118                  
     120 
    119121                // add to connection 
    120122                connMap.put(channel.getId(), newConnection); 
    121123 
    122                 logger.info("kraken-rpc: [{}] {} connected, remote={}", new Object[] { newConnection.getId(), 
    123                                 newConnection.isClient() ? "client" : "server", addr }); 
     124                logger.info("kraken-rpc: [{}] {} connected, remote={}", 
     125                                new Object[] { newConnection.getId(), newConnection.isClient() ? "client" : "server", addr }); 
    124126 
    125127                // ssl handshake 
     
    153155 
    154156                        String peerCommonName = peerCert.getSubjectDN().getName(); 
    155                         logger.info("kraken-rpc: new peer certificate subject={}, remote={}", peerCommonName, channel.getRemoteAddress()); 
     157                        logger.info("kraken-rpc: new peer certificate subject={}, remote={}", peerCommonName, 
     158                                        channel.getRemoteAddress()); 
    156159 
    157160                        if (executor != null) 
     
    190193        } 
    191194 
    192         private RpcConnection initializeConnection(Channel channel, RpcConnectionProperties props) throws InterruptedException { 
     195        private RpcConnection initializeConnection(Channel channel, RpcConnectionProperties props) 
     196                        throws InterruptedException { 
    193197                RpcConnectionImpl newConnection = null; 
    194198 
     
    295299 
    296300                        if (logger.isTraceEnabled()) 
    297                                 logger.trace("kraken-rpc: msg received - connection: {}, session: {}, message id: {}, type: {}, method: {}", 
     301                                logger.trace( 
     302                                                "kraken-rpc: msg received - connection: {}, session: {}, message id: {}, type: {}, method: {}", 
    298303                                                new Object[] { conn.getId(), sessionId, id, type, methodName }); 
    299304 
    300305                        RpcSession session = conn.findSession(sessionId); 
    301306                        if (session == null) { 
    302                                 logger.warn("kraken-rpc: session {} not found, connection={}, peer={}", new Object[] { sessionId, conn.getId(), 
    303                                                 conn.getRemoteAddress() }); 
     307                                logger.warn("kraken-rpc: session {} not found, connection={}, peer={}", 
     308                                                new Object[] { sessionId, conn.getId(), conn.getRemoteAddress() }); 
    304309                                return; 
    305310                        } 
     
    323328                                        channel.write(resp); 
    324329 
    325                                         logger.trace("kraken-rpc: return for [id={}, ret={}, session={}, method={}]", new Object[] { newId, id, 
    326                                                         session.getId(), methodName }); 
     330                                        logger.trace("kraken-rpc: return for [id={}, ret={}, session={}, method={}]", new Object[] { newId, 
     331                                                        id, session.getId(), methodName }); 
    327332                                } catch (Throwable t) { 
    328333                                        if (t.getCause() != null) 
     
    414419                String methodName = msg.getString("method"); 
    415420 
    416                 logger.trace("kraken-rpc: received rpc-call, connection={}, session={}, method={}", new Object[] { connectionId, 
    417                                 sessionId, methodName }); 
     421                logger.trace("kraken-rpc: received rpc-call, connection={}, session={}, method={}", new Object[] { 
     422                                connectionId, sessionId, methodName }); 
    418423 
    419424                Method method = binding.getMethod(methodName); 
     
    432437                                if (logger.isTraceEnabled()) 
    433438                                        traceMethodCall(connectionId, sessionId, methodName, args); 
    434                                  
     439 
    435440                                return method.invoke(binding.getService(), args); 
    436441                        } 
     
    449454                } 
    450455 
    451                 logger.trace("kraken rpc: method call, conn [{}], session [{}], method [{}], params [{}]", new Object[] { conn, session, 
    452                                 method, sb.toString() }); 
     456                logger.trace("kraken rpc: method call, conn [{}], session [{}], method [{}], params [{}]", new Object[] { conn, 
     457                                session, method, sb.toString() }); 
    453458        } 
    454459 
  • kraken-rpc/src/main/java/org/krakenapps/rpc/impl/RpcScript.java

    r373 r521  
    1616import org.krakenapps.api.ScriptContext; 
    1717import org.krakenapps.api.ScriptUsage; 
     18import org.krakenapps.rpc.RpcBindingProperties; 
    1819import org.krakenapps.rpc.RpcBlockingTable; 
    1920import org.krakenapps.rpc.RpcConnection; 
     
    4344        public void setScriptContext(ScriptContext context) { 
    4445                this.context = context; 
     46        } 
     47 
     48        public void bindings(String[] args) { 
     49                context.println("Port Bindings"); 
     50                context.println("---------------"); 
     51                for (RpcBindingProperties props : agent.getBindings()) 
     52                        context.println(props); 
     53        } 
     54 
     55        @ScriptUsage(description = "open rpc port", arguments = { 
     56                        @ScriptArgument(name = "port", type = "int", description = "port"), 
     57                        @ScriptArgument(name = "ip", type = "string", description = "bind address", optional = true) }) 
     58        public void open(String[] args) { 
     59                RpcBindingProperties props = null; 
     60                try { 
     61                        props = inputBindingProps(args); 
     62                        agent.open(props); 
     63                        context.println("opened "); 
     64                } catch (NumberFormatException e) { 
     65                        context.println("invalid port number format"); 
     66                } catch (Exception e) { 
     67                        context.println(e.getMessage()); 
     68                        logger.error("kraken rpc: cannot bind " + props, e); 
     69                } 
     70        } 
     71 
     72        @ScriptUsage(description = "open rpc port", arguments = { 
     73                        @ScriptArgument(name = "port", type = "int", description = "port"), 
     74                        @ScriptArgument(name = "key alias", type = "string", description = "key alias"), 
     75                        @ScriptArgument(name = "trust alias", type = "string", description = "trust alias"), 
     76                        @ScriptArgument(name = "ip", type = "string", description = "bind address", optional = true) }) 
     77        public void openSsl(String[] args) { 
     78                RpcBindingProperties props = null; 
     79                try { 
     80                        String keyAlias = args[1]; 
     81                        String trustAlias = args[2]; 
     82                        String ip = args[3]; 
     83 
     84                        args[1] = ip; 
     85                        args[2] = keyAlias; 
     86                        args[3] = trustAlias; 
     87                        props = inputBindingProps(args); 
     88 
     89                        agent.open(props); 
     90                        context.println("opened "); 
     91                } catch (NumberFormatException e) { 
     92                        context.println("invalid port number format"); 
     93                } catch (Exception e) { 
     94                        context.println(e.getMessage()); 
     95                        logger.error("kraken rpc: cannot bind " + props, e); 
     96                } 
     97        } 
     98 
     99        public void close(String[] args) { 
     100                try { 
     101                        RpcBindingProperties props = inputBindingProps(args); 
     102                        if (!agent.getBindings().contains(props)) { 
     103                                context.println("not opened"); 
     104                                return; 
     105                        } 
     106 
     107                        agent.close(props); 
     108                } catch (NumberFormatException e) { 
     109                        context.println("invalid port number format"); 
     110                } catch (Exception e) { 
     111                        context.println(e.getMessage()); 
     112                } 
     113        } 
     114 
     115        private RpcBindingProperties inputBindingProps(String[] args) { 
     116                int port = Integer.valueOf(args[0]); 
     117                if (port < 1 || port > 65535) 
     118                        throw new IllegalArgumentException("port number should be 1~65535"); 
     119 
     120                String addr = "0.0.0.0"; 
     121                if (args.length > 1) 
     122                        addr = args[1]; 
     123 
     124                String keyAlias = null; 
     125                String trustAlias = null; 
     126 
     127                if (args.length >= 4) { 
     128                        keyAlias = args[2]; 
     129                        trustAlias = args[3]; 
     130                } 
     131                 
     132                return new RpcBindingProperties(addr, port, keyAlias, trustAlias); 
    45133        } 
    46134 
     
    266354        // listening providers 
    267355        @ScriptUsage(description = "list all bound services", arguments = { @ScriptArgument(name = "cid", type = "int", description = "connection id") }) 
    268         public void bindings(String[] args) { 
     356        public void services(String[] args) { 
    269357                int id = Integer.parseInt(args[0]); 
    270358                RpcConnection conn = agent.findConnection(id); 
Note: See TracChangeset for help on using the changeset viewer.