Ignore:
Timestamp:
11/14/11 02:31:08 (6 months ago)
Author:
xeraph
Branch:
default
Message:

added mapreduce query (not yet completed, and rpc has race condition bug)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/Rpc.java

    r526 r532  
    22 
    33import java.util.Map; 
    4  
     4import org.krakenapps.api.Primitive; 
    55import org.krakenapps.logdb.LogQueryCommand; 
     6import org.krakenapps.rpc.RpcClient; 
     7import org.krakenapps.rpc.RpcConnection; 
     8import org.krakenapps.rpc.RpcConnectionProperties; 
     9import org.krakenapps.rpc.RpcException; 
     10import org.krakenapps.rpc.RpcSession; 
     11import org.slf4j.Logger; 
     12import org.slf4j.LoggerFactory; 
    613 
    714public class Rpc extends LogQueryCommand { 
     15        private final Logger logger = LoggerFactory.getLogger(Rpc.class.getName()); 
     16 
     17        private String agentGuid; 
     18        private RpcConnection upstream; 
     19 
    820        /** 
    921         * dist query guid 
     
    1325        private boolean sender; 
    1426 
    15         public Rpc(String guid, boolean sender) { 
     27        private RpcConnection datastream; 
     28        private RpcSession datasession; 
     29 
     30        public Rpc(String agentGuid, RpcConnection upstream, String guid, boolean sender) { 
     31                this.agentGuid = agentGuid; 
     32                this.upstream = upstream; 
    1633                this.guid = guid; 
    1734                this.sender = sender; 
     
    2037        @Override 
    2138        public void push(Map<String, Object> m) { 
    22                 write(m); 
     39                if (sender) { 
     40                        if (datastream == null) { 
     41                                RpcClient client = new RpcClient(agentGuid); 
     42                                RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 
     43                                props.setPassword((String) upstream.getProperty("password")); 
     44                                datastream = client.connect(props); 
     45                                try { 
     46                                        datasession = datastream.createSession("logdb-mapreduce"); 
     47                                        datasession.call("setLogStream", guid); 
     48                                } catch (RpcException e) { 
     49                                        logger.error("kraken logdb: cannot set log stream", e); 
     50                                } catch (InterruptedException e) { 
     51                                        logger.error("kraken logdb: cannot set log stream", e); 
     52                                } 
     53 
     54                                logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 
     55                        } 
     56 
     57                        datasession.post("push", m); 
     58                } else { 
     59                        write(m); 
     60                } 
     61 
     62                if (logger.isDebugEnabled()) 
     63                        logger.debug("kraken logdb: rpc [{}]", Primitive.stringify(m)); 
    2364        } 
    2465 
     
    3475 
    3576        @Override 
     77        public void eof() { 
     78                if (datasession != null) { 
     79                        datastream.close(); 
     80                        datastream = null; 
     81                        datasession = null; 
     82                } 
     83 
     84                logger.info("kraken logdb: closed rpc data stream for query guid [{}]", guid); 
     85                super.eof(); 
     86        } 
     87 
     88        @Override 
    3689        public String toString() { 
    3790                return "RPC " + (sender ? "Output" : "Input") + guid; 
Note: See TracChangeset for help on using the changeset viewer.