Changeset 534:099c597c0243


Ignore:
Timestamp:
11/14/11 11:47:19 (3 months ago)
Author:
xeraph
Branch:
default
Children:
535:c38ad740171a, 537:d8d85fe7264c
Message:

moved eof to separated rpc method (should be called by data session, otherwise, data loss will be occurred)

Location:
kraken-logdb/src/main/java/org/krakenapps/logdb
Files:
2 edited

Legend:

Unmodified
Added
Removed
  • kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java

    r532 r534  
    222222        } 
    223223 
     224        @RpcMethod(name = "eof") 
     225        public void eof(String queryGuid) { 
     226                // TODO: check if all mapper queries are ended 
     227                // for now, send eof if one mapper query is ended 
     228                RpcSession session = RpcContext.getSession(); 
     229                String nodeGuid = session.getConnection().getPeerGuid(); 
     230 
     231                Rpc rpc = rpcFromMap.get(queryGuid); 
     232                if (rpc != null) 
     233                        rpc.eof(); 
     234                else 
     235                        logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 
     236        } 
     237 
    224238        @RpcMethod(name = "createMapQuery") 
    225239        public int createMapQuery(String queryGuid, String query) { 
     
    306320                        if (q != null) 
    307321                                q.setEnd(true); 
    308  
    309                         // TODO: check if all mapper queries are ended 
    310                         // for now, send eof if one mapper query is ended 
    311                         String queryGuid = remoteQueryMappings.get(key); 
    312                         Rpc rpc = rpcFromMap.get(queryGuid); 
    313                         if (rpc != null) 
    314                                 rpc.eof(); 
    315                         else 
    316                                 logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 
    317322                } 
    318323                        break; 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/Rpc.java

    r532 r534  
    1919 
    2020        /** 
    21          * dist query guid 
     21         * mapreduce query guid 
    2222         */ 
    2323        private String guid; 
     
    2727        private RpcConnection datastream; 
    2828        private RpcSession datasession; 
     29        private boolean end; 
    2930 
    3031        public Rpc(String agentGuid, RpcConnection upstream, String guid, boolean sender) { 
     
    3738        @Override 
    3839        public void push(Map<String, Object> m) { 
     40                if (end) { 
     41                        logger.info("kraken logdb: loss (will be fixed) - {}", Primitive.stringify(m)); 
     42                        return; 
     43                } 
     44 
    3945                if (sender) { 
    4046                        if (datastream == null) { 
     
    7783        public void eof() { 
    7884                if (datasession != null) { 
     85                        try { 
     86                                datasession.call("eof", guid); 
     87                        } catch (Exception e) { 
     88                                logger.error("kraken logdb: eof fail for mapreduce query " + guid, e); 
     89                        } 
     90 
    7991                        datastream.close(); 
    8092                        datastream = null; 
     
    8496                logger.info("kraken logdb: closed rpc data stream for query guid [{}]", guid); 
    8597                super.eof(); 
     98                end = true; 
    8699        } 
    87100 
Note: See TracChangeset for help on using the changeset viewer.