Changeset 532:7057fc0c1459


Ignore:
Timestamp:
11/14/11 02:31:08 (3 months ago)
Author:
xeraph
Branch:
default
Message:

added mapreduce query (not yet completed, and rpc has race condition bug)

Location:
kraken-logdb/src/main/java/org/krakenapps/logdb
Files:
3 added
1 deleted
6 edited

Legend:

Unmodified
Added
Removed
  • kraken-logdb/src/main/java/org/krakenapps/logdb/impl/LogDBScript.java

    r530 r532  
    215215 
    216216                context.println(q); 
     217 
     218                LogQuery lq = q.getReduceQuery().getQuery(); 
     219                do { 
     220                        try { 
     221                                Thread.sleep(100); 
     222                        } catch (InterruptedException e) { 
     223                        } 
     224                } while (!lq.isEnd()); 
     225 
     226                List<Map<String, Object>> results = lq.getResult(); 
     227                for (Map<String, Object> m : results) 
     228                        printMap(m); 
     229                ((FileBufferList<Map<String, Object>>) results).close(); 
     230 
     231                qs.removeQuery(lq.getId()); 
    217232        } 
    218233 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java

    r530 r532  
    99import java.util.List; 
    1010import java.util.Map; 
     11import java.util.UUID; 
    1112import java.util.concurrent.ConcurrentHashMap; 
    1213import java.util.concurrent.ConcurrentMap; 
     
    1819import org.apache.felix.ipojo.annotations.ServiceProperty; 
    1920import org.apache.felix.ipojo.annotations.Validate; 
     21import org.krakenapps.api.Primitive; 
    2022import org.krakenapps.bnf.Binding; 
    2123import org.krakenapps.bnf.Syntax; 
     
    2426import org.krakenapps.logdb.DataSourceRegistry; 
    2527import org.krakenapps.logdb.LogQuery; 
     28import org.krakenapps.logdb.LogQueryCommand; 
    2629import org.krakenapps.logdb.LogQueryEventListener; 
    2730import org.krakenapps.logdb.LogQueryService; 
    2831import org.krakenapps.logdb.LogQueryStatus; 
    2932import org.krakenapps.logdb.SyntaxProvider; 
     33import org.krakenapps.logdb.mapreduce.MapQuery; 
    3034import org.krakenapps.logdb.mapreduce.MapReduceQueryStatus; 
    3135import org.krakenapps.logdb.mapreduce.MapReduceService; 
     36import org.krakenapps.logdb.mapreduce.ReduceQuery; 
     37import org.krakenapps.logdb.mapreduce.RemoteMapQuery; 
    3238import org.krakenapps.logdb.mapreduce.RemoteQuery; 
    3339import org.krakenapps.logdb.mapreduce.RemoteQueryKey; 
     40import org.krakenapps.logdb.query.LogQueryImpl; 
    3441import org.krakenapps.logdb.query.StringPlaceholder; 
    3542import org.krakenapps.logdb.query.command.Rpc; 
     
    4047import org.krakenapps.rpc.RpcConnectionProperties; 
    4148import org.krakenapps.rpc.RpcContext; 
     49import org.krakenapps.rpc.RpcException; 
    4250import org.krakenapps.rpc.RpcMethod; 
    4351import org.krakenapps.rpc.RpcSession; 
     
    7078        private String name; 
    7179 
     80        /** 
     81         * mapreduce query guid to rpcfrom mappings 
     82         */ 
    7283        private ConcurrentMap<String, Rpc> rpcFromMap; 
     84 
     85        /** 
     86         * mapreduce query guid to rpcto mappings 
     87         */ 
    7388        private ConcurrentMap<String, Rpc> rpcToMap; 
     89 
     90        /** 
     91         * rpcfrom command parser 
     92         */ 
    7493        private RpcFromParser rpcFromParser; 
     94 
     95        /** 
     96         * rpcto command parser 
     97         */ 
    7598        private RpcToParser rpcToParser; 
    7699 
     100        /** 
     101         * collected remote node's recent query statuses 
     102         */ 
    77103        private ConcurrentMap<RemoteQueryKey, RemoteQuery> remoteQueries; 
     104 
     105        /** 
     106         * search node connections 
     107         */ 
    78108        private ConcurrentMap<String, RpcConnection> upstreams; 
     109 
     110        /** 
     111         * connected from remote nodes. they push data source notifications, query 
     112         * status changes, and log data using separate data connection 
     113         */ 
    79114        private ConcurrentMap<String, RpcConnection> downstreams; 
     115 
     116        /** 
     117         * mapreduce query guid to map query requests (from remote node) 
     118         */ 
     119        private ConcurrentMap<String, MapQuery> mapQueries; 
     120 
     121        /** 
     122         * mapreduce query guid to local waiting reduce queries 
     123         */ 
     124        private ConcurrentMap<String, ReduceQuery> reduceQueries; 
     125 
     126        /** 
     127         * remote map query to mapreduce query guid relation 
     128         */ 
     129        private ConcurrentMap<RemoteQueryKey, String> remoteQueryMappings; 
    80130 
    81131        public MapReduceRpcService() { 
     
    89139                upstreams = new ConcurrentHashMap<String, RpcConnection>(); 
    90140                downstreams = new ConcurrentHashMap<String, RpcConnection>(); 
     141                mapQueries = new ConcurrentHashMap<String, MapQuery>(); 
     142                reduceQueries = new ConcurrentHashMap<String, ReduceQuery>(); 
    91143                remoteQueries = new ConcurrentHashMap<RemoteQueryKey, RemoteQuery>(); 
     144                remoteQueryMappings = new ConcurrentHashMap<RemoteQueryKey, String>(); 
    92145 
    93146                rpcFromParser = new RpcFromParser(); 
     147                rpcToParser = new RpcToParser(); 
    94148                syntaxProvider.addParsers(Arrays.asList(rpcFromParser, rpcToParser)); 
    95149                dataSourceRegistry.addListener(this); 
     
    116170                public Object parse(Binding b) { 
    117171                        String guid = (String) b.getChildren()[1].getValue(); 
    118                         Rpc rpc = new Rpc(guid, false); 
     172                        Rpc rpc = new Rpc(agent.getGuid(), null, guid, false); 
    119173                        rpcFromMap.put(guid, rpc); 
    120174                        return rpc; 
     
    132186                public Object parse(Binding b) { 
    133187                        String guid = (String) b.getChildren()[1].getValue(); 
    134                         Rpc rpc = new Rpc(guid, true); 
     188                        MapQuery mq = mapQueries.get(guid); 
     189                        Rpc rpc = new Rpc(agent.getGuid(), mq.getConnection(), guid, true); 
    135190                        rpcToMap.put(guid, rpc); 
    136191                        return rpc; 
     
    146201        public Rpc getRpcTo(String guid) { 
    147202                return rpcToMap.get(guid); 
     203        } 
     204 
     205        @RpcMethod(name = "setLogStream") 
     206        public void setLogStream(String guid) { 
     207                RpcSession session = RpcContext.getSession(); 
     208                session.setProperty("guid", guid); 
     209        } 
     210 
     211        @RpcMethod(name = "push") 
     212        public void push(Map<String, Object> data) { 
     213                RpcSession session = RpcContext.getSession(); 
     214                String queryGuid = (String) session.getProperty("guid"); 
     215                Rpc rpc = rpcFromMap.get(queryGuid); 
     216                rpc.push(data); 
     217 
     218                if (logger.isDebugEnabled()) { 
     219                        String s = Primitive.stringify(data); 
     220                        logger.debug("kraken logdb: pushed [{}] data [{}]", queryGuid, s); 
     221                } 
     222        } 
     223 
     224        @RpcMethod(name = "createMapQuery") 
     225        public int createMapQuery(String queryGuid, String query) { 
     226                try { 
     227                        RpcSession session = RpcContext.getSession(); 
     228                        String guid = session.getConnection().getPeerGuid(); 
     229 
     230                        // map query should be set before rpc command parsing 
     231                        MapQuery mq = new MapQuery(guid, session.getConnection()); 
     232                        mapQueries.put(queryGuid, mq); 
     233                        mq.setQuery(queryService.createQuery(query)); 
     234 
     235                        logger.info("kraken logdb: created map query [{}]", queryGuid); 
     236                        return mq.getQuery().getId(); 
     237                } catch (Exception e) { 
     238                        logger.error("kraken logdb: cannot create map query", e); 
     239                        throw new RpcException(e.getMessage()); 
     240                } 
     241        } 
     242 
     243        @RpcMethod(name = "startMapQuery") 
     244        public void startMapQuery(String queryGuid) { 
     245                MapQuery mq = mapQueries.get(queryGuid); 
     246                if (mq == null) 
     247                        throw new RpcException("mapreduce query not found: " + queryGuid); 
     248 
     249                queryService.startQuery(mq.getQuery().getId()); 
     250                logger.info("kraken logdb: started map query [{}]", queryGuid); 
     251        } 
     252 
     253        @RpcMethod(name = "removeMapQuery") 
     254        public void removeMapQuery(String queryGuid) { 
     255                MapQuery mq = mapQueries.remove(queryGuid); 
     256                if (mq == null) 
     257                        throw new RpcException("mapreduce query not found: " + queryGuid); 
     258 
     259                logger.info("kraken logdb: removed map query [{}]", queryGuid); 
    148260        } 
    149261 
     
    168280                RemoteQueryKey key = new RemoteQueryKey(guid, queryId); 
    169281                LogQueryStatus status = LogQueryStatus.valueOf(action); 
     282 
     283                logger.info("kraken logdb: query status change [{}, status={}]", key, status); 
    170284 
    171285                switch (status) { 
     
    192306                        if (q != null) 
    193307                                q.setEnd(true); 
     308 
     309                        // TODO: check if all mapper queries are ended 
     310                        // for now, send eof if one mapper query is ended 
     311                        String queryGuid = remoteQueryMappings.get(key); 
     312                        Rpc rpc = rpcFromMap.get(queryGuid); 
     313                        if (rpc != null) 
     314                                rpc.eof(); 
     315                        else 
     316                                logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 
    194317                } 
    195318                        break; 
     
    210333 
    211334        @Override 
    212         public MapReduceQueryStatus createQuery(String query) { 
    213                 return null; 
     335        public MapReduceQueryStatus createQuery(String queryString) { 
     336                String queryGuid = UUID.randomUUID().toString(); 
     337                LogQuery lq = new LogQueryImpl(syntaxProvider, queryString); 
     338 
     339                boolean foundReducer = false; 
     340                List<LogQueryCommand> mapCommands = new ArrayList<LogQueryCommand>(); 
     341                List<LogQueryCommand> reduceCommands = new ArrayList<LogQueryCommand>(); 
     342 
     343                for (LogQueryCommand c : lq.getCommands()) { 
     344                        if (c.isReducer()) 
     345                                foundReducer = true; 
     346 
     347                        if (foundReducer) 
     348                                reduceCommands.add(c); 
     349                        else 
     350                                mapCommands.add(c); 
     351                } 
     352 
     353                String mapQueryString = buildQueryString(mapCommands); 
     354                String reduceQueryString = buildQueryString(reduceCommands); 
     355 
     356                mapQueryString = mapQueryString + "|rpcto " + queryGuid; 
     357                reduceQueryString = "rpcfrom " + queryGuid + "|" + reduceQueryString; 
     358 
     359                logger.info("kraken logdb: map query [{}]", mapQueryString); 
     360                logger.info("kraken logdb: reduce query [{}]", reduceQueryString); 
     361 
     362                // create map queries 
     363                List<RemoteMapQuery> mapQueries = new ArrayList<RemoteMapQuery>(); 
     364                for (RpcConnection c : downstreams.values()) { 
     365                        RpcSession session = null; 
     366                        try { 
     367                                session = c.createSession("logdb-mapreduce"); 
     368                                int id = (Integer) session.call("createMapQuery", queryGuid, mapQueryString); 
     369                                mapQueries.add(new RemoteMapQuery(queryGuid, c.getPeerGuid(), id)); 
     370                                remoteQueryMappings.put(new RemoteQueryKey(c.getPeerGuid(), id), queryGuid); 
     371                        } catch (Exception e) { 
     372                                logger.error("kraken logdb: cannot create mapquery", e); 
     373                        } finally { 
     374                                if (session != null) 
     375                                        session.close(); 
     376                        } 
     377                } 
     378 
     379                // create and start reduce query 
     380                LogQuery q = queryService.createQuery(reduceQueryString); 
     381                ReduceQuery r = new ReduceQuery(queryGuid, q); 
     382                reduceQueries.put(queryGuid, r); 
     383                queryService.startQuery(q.getId()); 
     384 
     385                // start map queries (pumping) 
     386                for (RpcConnection c : downstreams.values()) { 
     387                        RpcSession session = null; 
     388                        try { 
     389                                session = c.createSession("logdb-mapreduce"); 
     390                                session.call("startMapQuery", queryGuid); 
     391                        } catch (Exception e) { 
     392                                logger.error("kraken logdb: cannot start mapquery", e); 
     393                        } finally { 
     394                                if (session != null) 
     395                                        session.close(); 
     396                        } 
     397                } 
     398 
     399                return new MapReduceQueryStatus(queryGuid, queryString, mapQueries, r); 
     400        } 
     401 
     402        private String buildQueryString(List<LogQueryCommand> commands) { 
     403                StringBuilder sb = new StringBuilder(); 
     404                int i = 0; 
     405                for (LogQueryCommand c : commands) { 
     406                        if (i++ != 0) 
     407                                sb.append("|"); 
     408 
     409                        sb.append(c.getQueryString()); 
     410                } 
     411 
     412                return sb.toString(); 
    214413        } 
    215414 
     
    247446        public void sessionOpened(RpcSessionEvent e) { 
    248447                RpcConnection conn = e.getSession().getConnection(); 
    249                 if (!downstreams.containsKey(conn.getPeerGuid())) { 
     448                if (!upstreams.containsKey(conn.getPeerGuid()) && !downstreams.containsKey(conn.getPeerGuid())) { 
    250449                        downstreams.put(conn.getPeerGuid(), conn); 
    251450                        logger.info("kraken logdb: downstream connection [{}] opened", conn); 
     
    277476 
    278477                                upstreams.put(conn.getPeerGuid(), conn); 
     478                                conn.bind("logdb-mapreduce", this); 
    279479 
    280480                                for (DataSource ds : dataSourceRegistry.getAll()) 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/mapreduce/MapReduceQueryStatus.java

    r529 r532  
    55public class MapReduceQueryStatus { 
    66        private String guid; 
    7         private String owner; 
    87        private String query; 
    9         private List<MapReduceQuery> mapperQueries; 
    10         private MapReduceQuery reducerQuery; 
     8        private List<RemoteMapQuery> mapQueries; 
     9        private ReduceQuery reduceQuery; 
    1110 
    12         public MapReduceQueryStatus(String guid, String owner, String query) { 
     11        public MapReduceQueryStatus(String guid, String query, List<RemoteMapQuery> mapQueries, ReduceQuery reduceQuery) { 
    1312                this.guid = guid; 
    14                 this.owner = owner; 
    1513                this.query = query; 
     14                this.mapQueries = mapQueries; 
     15                this.reduceQuery = reduceQuery; 
    1616        } 
    1717 
     
    2020        } 
    2121 
    22         public String getOwner() { 
    23                 return owner; 
    24         } 
    25  
    2622        public String getQuery() { 
    2723                return query; 
    2824        } 
    2925 
    30         public List<MapReduceQuery> getMapperQueries() { 
    31                 return mapperQueries; 
     26        public List<RemoteMapQuery> getMapQueries() { 
     27                return mapQueries; 
    3228        } 
    3329 
    34         public void setMapperQueries(List<MapReduceQuery> mapperQueries) { 
    35                 this.mapperQueries = mapperQueries; 
     30        public void setMapQueries(List<RemoteMapQuery> mapQueries) { 
     31                this.mapQueries = mapQueries; 
    3632        } 
    3733 
    38         public MapReduceQuery getReducerQuery() { 
    39                 return reducerQuery; 
     34        public ReduceQuery getReduceQuery() { 
     35                return reduceQuery; 
    4036        } 
    4137 
    42         public void setReducerQuery(MapReduceQuery reducerQuery) { 
    43                 this.reducerQuery = reducerQuery; 
     38        public void setReduceQuery(ReduceQuery reduceQuery) { 
     39                this.reduceQuery = reduceQuery; 
    4440        } 
    4541 
    4642        @Override 
    4743        public String toString() { 
    48                 return "arbiter query status: guid=" + guid + ", owner=" + owner + "query=" + query; 
     44                return "mapreduce query status: guid=" + guid + ", query=" + query; 
    4945        } 
    5046 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/query/LogQueryImpl.java

    r457 r532  
    3030import org.krakenapps.logdb.LogQueryCallback; 
    3131import org.krakenapps.logdb.LogQueryCommand; 
    32 import org.krakenapps.logdb.LogQueryService; 
    3332import org.krakenapps.logdb.SyntaxProvider; 
    34 import org.krakenapps.logstorage.LogStorage; 
    35 import org.krakenapps.logstorage.LogTableRegistry; 
    3633import org.krakenapps.logdb.LogQueryCommand.Status; 
    3734import org.krakenapps.logdb.LogTimelineCallback; 
     
    5148        private Set<LogTimelineCallback> timelineCallbacks = new HashSet<LogTimelineCallback>(); 
    5249 
    53         public LogQueryImpl(SyntaxProvider syntaxProvider, LogQueryService service, LogStorage logStorage, 
    54                         LogTableRegistry tableRegistry, String query) { 
    55                 this.queryString = query; 
     50        public LogQueryImpl(SyntaxProvider syntaxProvider, String queryString) { 
     51                this.queryString = queryString; 
    5652 
    5753                for (String q : queryString.split("\\|")) { 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/query/LogQueryServiceImpl.java

    r529 r532  
    109109        @Override 
    110110        public LogQuery createQuery(String query) { 
    111                 LogQuery lq = new LogQueryImpl(syntaxProvider, this, logStorage, tableRegistry, query); 
     111                LogQuery lq = new LogQueryImpl(syntaxProvider, query); 
    112112                queries.put(lq.getId(), lq); 
    113113                lq.registerQueryCallback(new EofReceiver(lq)); 
  • kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/Rpc.java

    r526 r532  
    22 
    33import java.util.Map; 
    4  
     4import org.krakenapps.api.Primitive; 
    55import org.krakenapps.logdb.LogQueryCommand; 
     6import org.krakenapps.rpc.RpcClient; 
     7import org.krakenapps.rpc.RpcConnection; 
     8import org.krakenapps.rpc.RpcConnectionProperties; 
     9import org.krakenapps.rpc.RpcException; 
     10import org.krakenapps.rpc.RpcSession; 
     11import org.slf4j.Logger; 
     12import org.slf4j.LoggerFactory; 
    613 
    714public class Rpc extends LogQueryCommand { 
     15        private final Logger logger = LoggerFactory.getLogger(Rpc.class.getName()); 
     16 
     17        private String agentGuid; 
     18        private RpcConnection upstream; 
     19 
    820        /** 
    921         * dist query guid 
     
    1325        private boolean sender; 
    1426 
    15         public Rpc(String guid, boolean sender) { 
     27        private RpcConnection datastream; 
     28        private RpcSession datasession; 
     29 
     30        public Rpc(String agentGuid, RpcConnection upstream, String guid, boolean sender) { 
     31                this.agentGuid = agentGuid; 
     32                this.upstream = upstream; 
    1633                this.guid = guid; 
    1734                this.sender = sender; 
     
    2037        @Override 
    2138        public void push(Map<String, Object> m) { 
    22                 write(m); 
     39                if (sender) { 
     40                        if (datastream == null) { 
     41                                RpcClient client = new RpcClient(agentGuid); 
     42                                RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 
     43                                props.setPassword((String) upstream.getProperty("password")); 
     44                                datastream = client.connect(props); 
     45                                try { 
     46                                        datasession = datastream.createSession("logdb-mapreduce"); 
     47                                        datasession.call("setLogStream", guid); 
     48                                } catch (RpcException e) { 
     49                                        logger.error("kraken logdb: cannot set log stream", e); 
     50                                } catch (InterruptedException e) { 
     51                                        logger.error("kraken logdb: cannot set log stream", e); 
     52                                } 
     53 
     54                                logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 
     55                        } 
     56 
     57                        datasession.post("push", m); 
     58                } else { 
     59                        write(m); 
     60                } 
     61 
     62                if (logger.isDebugEnabled()) 
     63                        logger.debug("kraken logdb: rpc [{}]", Primitive.stringify(m)); 
    2364        } 
    2465 
     
    3475 
    3576        @Override 
     77        public void eof() { 
     78                if (datasession != null) { 
     79                        datastream.close(); 
     80                        datastream = null; 
     81                        datasession = null; 
     82                } 
     83 
     84                logger.info("kraken logdb: closed rpc data stream for query guid [{}]", guid); 
     85                super.eof(); 
     86        } 
     87 
     88        @Override 
    3689        public String toString() { 
    3790                return "RPC " + (sender ? "Output" : "Input") + guid; 
Note: See TracChangeset for help on using the changeset viewer.