Ignore:
Timestamp:
11/14/11 11:47:19 (6 months ago)
Author:
xeraph
Branch:
default
Children:
535:c38ad740171a, 537:d8d85fe7264c
Message:

moved eof to separated rpc method (should be called by data session, otherwise, data loss will be occurred)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java

    r532 r534  
    222222        } 
    223223 
     224        @RpcMethod(name = "eof") 
     225        public void eof(String queryGuid) { 
     226                // TODO: check if all mapper queries are ended 
     227                // for now, send eof if one mapper query is ended 
     228                RpcSession session = RpcContext.getSession(); 
     229                String nodeGuid = session.getConnection().getPeerGuid(); 
     230 
     231                Rpc rpc = rpcFromMap.get(queryGuid); 
     232                if (rpc != null) 
     233                        rpc.eof(); 
     234                else 
     235                        logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 
     236        } 
     237 
    224238        @RpcMethod(name = "createMapQuery") 
    225239        public int createMapQuery(String queryGuid, String query) { 
     
    306320                        if (q != null) 
    307321                                q.setEnd(true); 
    308  
    309                         // TODO: check if all mapper queries are ended 
    310                         // for now, send eof if one mapper query is ended 
    311                         String queryGuid = remoteQueryMappings.get(key); 
    312                         Rpc rpc = rpcFromMap.get(queryGuid); 
    313                         if (rpc != null) 
    314                                 rpc.eof(); 
    315                         else 
    316                                 logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 
    317322                } 
    318323                        break; 
Note: See TracChangeset for help on using the changeset viewer.