Changeset 537:d8d85fe7264c


Ignore:
Timestamp:
11/14/11 14:56:36 (6 months ago)
Author:
xeraph
Branch:
default
Message:
  • separated RpcFrom and RpcTo class
  • serialized RpcFrom input data using queue (prevent potential synchronization error)
  • rpc mapper hang problem found
Location:
kraken-logdb/src/main/java/org/krakenapps/logdb
Files:
1 added
3 edited
1 moved

Legend:

Unmodified
Added
Removed
  • kraken-logdb/src/main/java/org/krakenapps/logdb/impl/LogDBScript.java

    r532 r537  
    3939import org.krakenapps.logdb.mapreduce.RemoteQuery; 
    4040import org.krakenapps.logdb.query.FileBufferList; 
    41 import org.krakenapps.logdb.query.command.Rpc; 
     41import org.krakenapps.logdb.query.command.RpcFrom; 
     42import org.krakenapps.logdb.query.command.RpcTo; 
    4243import org.krakenapps.rpc.RpcConnection; 
    4344import org.krakenapps.rpc.RpcConnectionProperties; 
     
    236237                        @ScriptArgument(name = "sample string", type = "string", description = "sample string") }) 
    237238        public void rpcfrom(String[] args) { 
    238                 Rpc rpc = mapreduce.getRpcFrom(args[0]); 
     239                RpcFrom rpc = mapreduce.getRpcFrom(args[0]); 
    239240                if (rpc == null) { 
    240241                        context.println("rpc not found"); 
     
    249250        @ScriptUsage(description = "eof to rpcfrom", arguments = { @ScriptArgument(name = "guid", type = "string", description = "dist query guid") }) 
    250251        public void rpceof(String[] args) { 
    251                 Rpc rpc = mapreduce.getRpcFrom(args[0]); 
     252                RpcFrom rpc = mapreduce.getRpcFrom(args[0]); 
    252253                if (rpc == null) { 
    253254                        context.println("rpc not found"); 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java

    r534 r537  
    4040import org.krakenapps.logdb.query.LogQueryImpl; 
    4141import org.krakenapps.logdb.query.StringPlaceholder; 
    42 import org.krakenapps.logdb.query.command.Rpc; 
     42import org.krakenapps.logdb.query.command.RpcFrom; 
     43import org.krakenapps.logdb.query.command.RpcTo; 
    4344import org.krakenapps.logdb.query.parser.QueryParser; 
    4445import org.krakenapps.rpc.RpcAgent; 
     
    8182         * mapreduce query guid to rpcfrom mappings 
    8283         */ 
    83         private ConcurrentMap<String, Rpc> rpcFromMap; 
     84        private ConcurrentMap<String, RpcFrom> rpcFromMap; 
    8485 
    8586        /** 
    8687         * mapreduce query guid to rpcto mappings 
    8788         */ 
    88         private ConcurrentMap<String, Rpc> rpcToMap; 
     89        private ConcurrentMap<String, RpcTo> rpcToMap; 
    8990 
    9091        /** 
     
    135136        public void start() { 
    136137                queries = new ConcurrentHashMap<String, MapReduceQueryStatus>(); 
    137                 rpcFromMap = new ConcurrentHashMap<String, Rpc>(); 
    138                 rpcToMap = new ConcurrentHashMap<String, Rpc>(); 
     138                rpcFromMap = new ConcurrentHashMap<String, RpcFrom>(); 
     139                rpcToMap = new ConcurrentHashMap<String, RpcTo>(); 
    139140                upstreams = new ConcurrentHashMap<String, RpcConnection>(); 
    140141                downstreams = new ConcurrentHashMap<String, RpcConnection>(); 
     
    170171                public Object parse(Binding b) { 
    171172                        String guid = (String) b.getChildren()[1].getValue(); 
    172                         Rpc rpc = new Rpc(agent.getGuid(), null, guid, false); 
     173                        RpcFrom rpc = new RpcFrom(guid); 
    173174                        rpcFromMap.put(guid, rpc); 
    174175                        return rpc; 
     
    187188                        String guid = (String) b.getChildren()[1].getValue(); 
    188189                        MapQuery mq = mapQueries.get(guid); 
    189                         Rpc rpc = new Rpc(agent.getGuid(), mq.getConnection(), guid, true); 
     190                        RpcTo rpc = new RpcTo(agent.getGuid(), mq.getConnection(), guid); 
    190191                        rpcToMap.put(guid, rpc); 
    191192                        return rpc; 
     
    194195 
    195196        @Override 
    196         public Rpc getRpcFrom(String guid) { 
     197        public RpcFrom getRpcFrom(String guid) { 
    197198                return rpcFromMap.get(guid); 
    198199        } 
    199200 
    200201        @Override 
    201         public Rpc getRpcTo(String guid) { 
     202        public RpcTo getRpcTo(String guid) { 
    202203                return rpcToMap.get(guid); 
    203204        } 
     
    213214                RpcSession session = RpcContext.getSession(); 
    214215                String queryGuid = (String) session.getProperty("guid"); 
    215                 Rpc rpc = rpcFromMap.get(queryGuid); 
     216                RpcFrom rpc = rpcFromMap.get(queryGuid); 
    216217                rpc.push(data); 
    217218 
     
    229230                String nodeGuid = session.getConnection().getPeerGuid(); 
    230231 
    231                 Rpc rpc = rpcFromMap.get(queryGuid); 
     232                RpcFrom rpc = rpcFromMap.get(queryGuid); 
    232233                if (rpc != null) 
    233234                        rpc.eof(); 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/mapreduce/MapReduceService.java

    r530 r537  
    44import java.util.List; 
    55 
    6 import org.krakenapps.logdb.query.command.Rpc; 
     6import org.krakenapps.logdb.query.command.RpcFrom; 
     7import org.krakenapps.logdb.query.command.RpcTo; 
    78import org.krakenapps.rpc.RpcConnection; 
    89import org.krakenapps.rpc.RpcConnectionProperties; 
    910 
    1011public interface MapReduceService { 
    11         Rpc getRpcFrom(String guid); 
     12        RpcFrom getRpcFrom(String guid); 
    1213 
    13         Rpc getRpcTo(String guid); 
     14        RpcTo getRpcTo(String guid); 
    1415 
    1516        List<MapReduceQueryStatus> getQueries(); 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/RpcTo.java

    r534 r537  
    1212import org.slf4j.LoggerFactory; 
    1313 
    14 public class Rpc extends LogQueryCommand { 
    15         private final Logger logger = LoggerFactory.getLogger(Rpc.class.getName()); 
     14public class RpcTo extends LogQueryCommand { 
     15        private final Logger logger = LoggerFactory.getLogger(RpcTo.class.getName()); 
    1616 
    1717        private String agentGuid; 
     
    2323        private String guid; 
    2424 
    25         private boolean sender; 
    26  
     25        private RpcClient client; 
    2726        private RpcConnection datastream; 
    2827        private RpcSession datasession; 
    29         private boolean end; 
    3028 
    31         public Rpc(String agentGuid, RpcConnection upstream, String guid, boolean sender) { 
     29        public RpcTo(String agentGuid, RpcConnection upstream, String guid) { 
    3230                this.agentGuid = agentGuid; 
    3331                this.upstream = upstream; 
    3432                this.guid = guid; 
    35                 this.sender = sender; 
    3633        } 
    3734 
    3835        @Override 
    3936        public void push(Map<String, Object> m) { 
    40                 if (end) { 
    41                         logger.info("kraken logdb: loss (will be fixed) - {}", Primitive.stringify(m)); 
    42                         return; 
     37 
     38                if (datastream == null) { 
     39                        client = new RpcClient(agentGuid); 
     40                        RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 
     41                        props.setPassword((String) upstream.getProperty("password")); 
     42                        datastream = client.connect(props); 
     43                        try { 
     44                                datasession = datastream.createSession("logdb-mapreduce"); 
     45                                datasession.call("setLogStream", guid); 
     46                        } catch (RpcException e) { 
     47                                logger.error("kraken logdb: cannot set log stream", e); 
     48                        } catch (InterruptedException e) { 
     49                                logger.error("kraken logdb: cannot set log stream", e); 
     50                        } 
     51 
     52                        logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 
    4353                } 
    4454 
    45                 if (sender) { 
    46                         if (datastream == null) { 
    47                                 RpcClient client = new RpcClient(agentGuid); 
    48                                 RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 
    49                                 props.setPassword((String) upstream.getProperty("password")); 
    50                                 datastream = client.connect(props); 
    51                                 try { 
    52                                         datasession = datastream.createSession("logdb-mapreduce"); 
    53                                         datasession.call("setLogStream", guid); 
    54                                 } catch (RpcException e) { 
    55                                         logger.error("kraken logdb: cannot set log stream", e); 
    56                                 } catch (InterruptedException e) { 
    57                                         logger.error("kraken logdb: cannot set log stream", e); 
    58                                 } 
    59  
    60                                 logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 
    61                         } 
    62  
    63                         datasession.post("push", m); 
    64                 } else { 
    65                         write(m); 
    66                 } 
     55                datasession.post("push", m); 
    6756 
    6857                if (logger.isDebugEnabled()) 
    69                         logger.debug("kraken logdb: rpc [{}]", Primitive.stringify(m)); 
     58                        logger.debug("kraken logdb: rpc mapper [{}]", Primitive.stringify(m)); 
    7059        } 
    7160 
     
    8978                        } 
    9079 
     80                        datasession.close(); 
    9181                        datastream.close(); 
     82                        client.close(); 
     83                        client = null; 
    9284                        datastream = null; 
    9385                        datasession = null; 
    9486                } 
    9587 
    96                 logger.info("kraken logdb: closed rpc data stream for query guid [{}]", guid); 
     88                logger.info("kraken logdb: closed rpc mapper stream for query guid [{}]", guid); 
    9789                super.eof(); 
    98                 end = true; 
    9990        } 
    10091 
    10192        @Override 
    10293        public String toString() { 
    103                 return "RPC " + (sender ? "Output" : "Input") + guid; 
     94                return "RPC Mapper [" + guid + "]"; 
    10495        } 
    105  
    10696} 
Note: See TracChangeset for help on using the changeset viewer.