Ignore:
Timestamp:
11/14/11 02:31:08 (6 months ago)
Author:
xeraph
Branch:
default
Message:

added mapreduce query (not yet completed, and rpc has race condition bug)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java

    r530 r532  
    99import java.util.List; 
    1010import java.util.Map; 
     11import java.util.UUID; 
    1112import java.util.concurrent.ConcurrentHashMap; 
    1213import java.util.concurrent.ConcurrentMap; 
     
    1819import org.apache.felix.ipojo.annotations.ServiceProperty; 
    1920import org.apache.felix.ipojo.annotations.Validate; 
     21import org.krakenapps.api.Primitive; 
    2022import org.krakenapps.bnf.Binding; 
    2123import org.krakenapps.bnf.Syntax; 
     
    2426import org.krakenapps.logdb.DataSourceRegistry; 
    2527import org.krakenapps.logdb.LogQuery; 
     28import org.krakenapps.logdb.LogQueryCommand; 
    2629import org.krakenapps.logdb.LogQueryEventListener; 
    2730import org.krakenapps.logdb.LogQueryService; 
    2831import org.krakenapps.logdb.LogQueryStatus; 
    2932import org.krakenapps.logdb.SyntaxProvider; 
     33import org.krakenapps.logdb.mapreduce.MapQuery; 
    3034import org.krakenapps.logdb.mapreduce.MapReduceQueryStatus; 
    3135import org.krakenapps.logdb.mapreduce.MapReduceService; 
     36import org.krakenapps.logdb.mapreduce.ReduceQuery; 
     37import org.krakenapps.logdb.mapreduce.RemoteMapQuery; 
    3238import org.krakenapps.logdb.mapreduce.RemoteQuery; 
    3339import org.krakenapps.logdb.mapreduce.RemoteQueryKey; 
     40import org.krakenapps.logdb.query.LogQueryImpl; 
    3441import org.krakenapps.logdb.query.StringPlaceholder; 
    3542import org.krakenapps.logdb.query.command.Rpc; 
     
    4047import org.krakenapps.rpc.RpcConnectionProperties; 
    4148import org.krakenapps.rpc.RpcContext; 
     49import org.krakenapps.rpc.RpcException; 
    4250import org.krakenapps.rpc.RpcMethod; 
    4351import org.krakenapps.rpc.RpcSession; 
     
    7078        private String name; 
    7179 
     80        /** 
     81         * mapreduce query guid to rpcfrom mappings 
     82         */ 
    7283        private ConcurrentMap<String, Rpc> rpcFromMap; 
     84 
     85        /** 
     86         * mapreduce query guid to rpcto mappings 
     87         */ 
    7388        private ConcurrentMap<String, Rpc> rpcToMap; 
     89 
     90        /** 
     91         * rpcfrom command parser 
     92         */ 
    7493        private RpcFromParser rpcFromParser; 
     94 
     95        /** 
     96         * rpcto command parser 
     97         */ 
    7598        private RpcToParser rpcToParser; 
    7699 
     100        /** 
     101         * collected remote node's recent query statuses 
     102         */ 
    77103        private ConcurrentMap<RemoteQueryKey, RemoteQuery> remoteQueries; 
     104 
     105        /** 
     106         * search node connections 
     107         */ 
    78108        private ConcurrentMap<String, RpcConnection> upstreams; 
     109 
     110        /** 
     111         * connected from remote nodes. they push data source notifications, query 
     112         * status changes, and log data using separate data connection 
     113         */ 
    79114        private ConcurrentMap<String, RpcConnection> downstreams; 
     115 
     116        /** 
     117         * mapreduce query guid to map query requests (from remote node) 
     118         */ 
     119        private ConcurrentMap<String, MapQuery> mapQueries; 
     120 
     121        /** 
     122         * mapreduce query guid to local waiting reduce queries 
     123         */ 
     124        private ConcurrentMap<String, ReduceQuery> reduceQueries; 
     125 
     126        /** 
     127         * remote map query to mapreduce query guid relation 
     128         */ 
     129        private ConcurrentMap<RemoteQueryKey, String> remoteQueryMappings; 
    80130 
    81131        public MapReduceRpcService() { 
     
    89139                upstreams = new ConcurrentHashMap<String, RpcConnection>(); 
    90140                downstreams = new ConcurrentHashMap<String, RpcConnection>(); 
     141                mapQueries = new ConcurrentHashMap<String, MapQuery>(); 
     142                reduceQueries = new ConcurrentHashMap<String, ReduceQuery>(); 
    91143                remoteQueries = new ConcurrentHashMap<RemoteQueryKey, RemoteQuery>(); 
     144                remoteQueryMappings = new ConcurrentHashMap<RemoteQueryKey, String>(); 
    92145 
    93146                rpcFromParser = new RpcFromParser(); 
     147                rpcToParser = new RpcToParser(); 
    94148                syntaxProvider.addParsers(Arrays.asList(rpcFromParser, rpcToParser)); 
    95149                dataSourceRegistry.addListener(this); 
     
    116170                public Object parse(Binding b) { 
    117171                        String guid = (String) b.getChildren()[1].getValue(); 
    118                         Rpc rpc = new Rpc(guid, false); 
     172                        Rpc rpc = new Rpc(agent.getGuid(), null, guid, false); 
    119173                        rpcFromMap.put(guid, rpc); 
    120174                        return rpc; 
     
    132186                public Object parse(Binding b) { 
    133187                        String guid = (String) b.getChildren()[1].getValue(); 
    134                         Rpc rpc = new Rpc(guid, true); 
     188                        MapQuery mq = mapQueries.get(guid); 
     189                        Rpc rpc = new Rpc(agent.getGuid(), mq.getConnection(), guid, true); 
    135190                        rpcToMap.put(guid, rpc); 
    136191                        return rpc; 
     
    146201        public Rpc getRpcTo(String guid) { 
    147202                return rpcToMap.get(guid); 
     203        } 
     204 
     205        @RpcMethod(name = "setLogStream") 
     206        public void setLogStream(String guid) { 
     207                RpcSession session = RpcContext.getSession(); 
     208                session.setProperty("guid", guid); 
     209        } 
     210 
     211        @RpcMethod(name = "push") 
     212        public void push(Map<String, Object> data) { 
     213                RpcSession session = RpcContext.getSession(); 
     214                String queryGuid = (String) session.getProperty("guid"); 
     215                Rpc rpc = rpcFromMap.get(queryGuid); 
     216                rpc.push(data); 
     217 
     218                if (logger.isDebugEnabled()) { 
     219                        String s = Primitive.stringify(data); 
     220                        logger.debug("kraken logdb: pushed [{}] data [{}]", queryGuid, s); 
     221                } 
     222        } 
     223 
     224        @RpcMethod(name = "createMapQuery") 
     225        public int createMapQuery(String queryGuid, String query) { 
     226                try { 
     227                        RpcSession session = RpcContext.getSession(); 
     228                        String guid = session.getConnection().getPeerGuid(); 
     229 
     230                        // map query should be set before rpc command parsing 
     231                        MapQuery mq = new MapQuery(guid, session.getConnection()); 
     232                        mapQueries.put(queryGuid, mq); 
     233                        mq.setQuery(queryService.createQuery(query)); 
     234 
     235                        logger.info("kraken logdb: created map query [{}]", queryGuid); 
     236                        return mq.getQuery().getId(); 
     237                } catch (Exception e) { 
     238                        logger.error("kraken logdb: cannot create map query", e); 
     239                        throw new RpcException(e.getMessage()); 
     240                } 
     241        } 
     242 
     243        @RpcMethod(name = "startMapQuery") 
     244        public void startMapQuery(String queryGuid) { 
     245                MapQuery mq = mapQueries.get(queryGuid); 
     246                if (mq == null) 
     247                        throw new RpcException("mapreduce query not found: " + queryGuid); 
     248 
     249                queryService.startQuery(mq.getQuery().getId()); 
     250                logger.info("kraken logdb: started map query [{}]", queryGuid); 
     251        } 
     252 
     253        @RpcMethod(name = "removeMapQuery") 
     254        public void removeMapQuery(String queryGuid) { 
     255                MapQuery mq = mapQueries.remove(queryGuid); 
     256                if (mq == null) 
     257                        throw new RpcException("mapreduce query not found: " + queryGuid); 
     258 
     259                logger.info("kraken logdb: removed map query [{}]", queryGuid); 
    148260        } 
    149261 
     
    168280                RemoteQueryKey key = new RemoteQueryKey(guid, queryId); 
    169281                LogQueryStatus status = LogQueryStatus.valueOf(action); 
     282 
     283                logger.info("kraken logdb: query status change [{}, status={}]", key, status); 
    170284 
    171285                switch (status) { 
     
    192306                        if (q != null) 
    193307                                q.setEnd(true); 
     308 
     309                        // TODO: check if all mapper queries are ended 
     310                        // for now, send eof if one mapper query is ended 
     311                        String queryGuid = remoteQueryMappings.get(key); 
     312                        Rpc rpc = rpcFromMap.get(queryGuid); 
     313                        if (rpc != null) 
     314                                rpc.eof(); 
     315                        else 
     316                                logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 
    194317                } 
    195318                        break; 
     
    210333 
    211334        @Override 
    212         public MapReduceQueryStatus createQuery(String query) { 
    213                 return null; 
     335        public MapReduceQueryStatus createQuery(String queryString) { 
     336                String queryGuid = UUID.randomUUID().toString(); 
     337                LogQuery lq = new LogQueryImpl(syntaxProvider, queryString); 
     338 
     339                boolean foundReducer = false; 
     340                List<LogQueryCommand> mapCommands = new ArrayList<LogQueryCommand>(); 
     341                List<LogQueryCommand> reduceCommands = new ArrayList<LogQueryCommand>(); 
     342 
     343                for (LogQueryCommand c : lq.getCommands()) { 
     344                        if (c.isReducer()) 
     345                                foundReducer = true; 
     346 
     347                        if (foundReducer) 
     348                                reduceCommands.add(c); 
     349                        else 
     350                                mapCommands.add(c); 
     351                } 
     352 
     353                String mapQueryString = buildQueryString(mapCommands); 
     354                String reduceQueryString = buildQueryString(reduceCommands); 
     355 
     356                mapQueryString = mapQueryString + "|rpcto " + queryGuid; 
     357                reduceQueryString = "rpcfrom " + queryGuid + "|" + reduceQueryString; 
     358 
     359                logger.info("kraken logdb: map query [{}]", mapQueryString); 
     360                logger.info("kraken logdb: reduce query [{}]", reduceQueryString); 
     361 
     362                // create map queries 
     363                List<RemoteMapQuery> mapQueries = new ArrayList<RemoteMapQuery>(); 
     364                for (RpcConnection c : downstreams.values()) { 
     365                        RpcSession session = null; 
     366                        try { 
     367                                session = c.createSession("logdb-mapreduce"); 
     368                                int id = (Integer) session.call("createMapQuery", queryGuid, mapQueryString); 
     369                                mapQueries.add(new RemoteMapQuery(queryGuid, c.getPeerGuid(), id)); 
     370                                remoteQueryMappings.put(new RemoteQueryKey(c.getPeerGuid(), id), queryGuid); 
     371                        } catch (Exception e) { 
     372                                logger.error("kraken logdb: cannot create mapquery", e); 
     373                        } finally { 
     374                                if (session != null) 
     375                                        session.close(); 
     376                        } 
     377                } 
     378 
     379                // create and start reduce query 
     380                LogQuery q = queryService.createQuery(reduceQueryString); 
     381                ReduceQuery r = new ReduceQuery(queryGuid, q); 
     382                reduceQueries.put(queryGuid, r); 
     383                queryService.startQuery(q.getId()); 
     384 
     385                // start map queries (pumping) 
     386                for (RpcConnection c : downstreams.values()) { 
     387                        RpcSession session = null; 
     388                        try { 
     389                                session = c.createSession("logdb-mapreduce"); 
     390                                session.call("startMapQuery", queryGuid); 
     391                        } catch (Exception e) { 
     392                                logger.error("kraken logdb: cannot start mapquery", e); 
     393                        } finally { 
     394                                if (session != null) 
     395                                        session.close(); 
     396                        } 
     397                } 
     398 
     399                return new MapReduceQueryStatus(queryGuid, queryString, mapQueries, r); 
     400        } 
     401 
     402        private String buildQueryString(List<LogQueryCommand> commands) { 
     403                StringBuilder sb = new StringBuilder(); 
     404                int i = 0; 
     405                for (LogQueryCommand c : commands) { 
     406                        if (i++ != 0) 
     407                                sb.append("|"); 
     408 
     409                        sb.append(c.getQueryString()); 
     410                } 
     411 
     412                return sb.toString(); 
    214413        } 
    215414 
     
    247446        public void sessionOpened(RpcSessionEvent e) { 
    248447                RpcConnection conn = e.getSession().getConnection(); 
    249                 if (!downstreams.containsKey(conn.getPeerGuid())) { 
     448                if (!upstreams.containsKey(conn.getPeerGuid()) && !downstreams.containsKey(conn.getPeerGuid())) { 
    250449                        downstreams.put(conn.getPeerGuid(), conn); 
    251450                        logger.info("kraken logdb: downstream connection [{}] opened", conn); 
     
    277476 
    278477                                upstreams.put(conn.getPeerGuid(), conn); 
     478                                conn.bind("logdb-mapreduce", this); 
    279479 
    280480                                for (DataSource ds : dataSourceRegistry.getAll()) 
Note: See TracChangeset for help on using the changeset viewer.