Changeset 532:7057fc0c1459 for kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java
Legend:
- Unmodified
- Added
- Removed
-
kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java
r530 r532 9 9 import java.util.List; 10 10 import java.util.Map; 11 import java.util.UUID; 11 12 import java.util.concurrent.ConcurrentHashMap; 12 13 import java.util.concurrent.ConcurrentMap; … … 18 19 import org.apache.felix.ipojo.annotations.ServiceProperty; 19 20 import org.apache.felix.ipojo.annotations.Validate; 21 import org.krakenapps.api.Primitive; 20 22 import org.krakenapps.bnf.Binding; 21 23 import org.krakenapps.bnf.Syntax; … … 24 26 import org.krakenapps.logdb.DataSourceRegistry; 25 27 import org.krakenapps.logdb.LogQuery; 28 import org.krakenapps.logdb.LogQueryCommand; 26 29 import org.krakenapps.logdb.LogQueryEventListener; 27 30 import org.krakenapps.logdb.LogQueryService; 28 31 import org.krakenapps.logdb.LogQueryStatus; 29 32 import org.krakenapps.logdb.SyntaxProvider; 33 import org.krakenapps.logdb.mapreduce.MapQuery; 30 34 import org.krakenapps.logdb.mapreduce.MapReduceQueryStatus; 31 35 import org.krakenapps.logdb.mapreduce.MapReduceService; 36 import org.krakenapps.logdb.mapreduce.ReduceQuery; 37 import org.krakenapps.logdb.mapreduce.RemoteMapQuery; 32 38 import org.krakenapps.logdb.mapreduce.RemoteQuery; 33 39 import org.krakenapps.logdb.mapreduce.RemoteQueryKey; 40 import org.krakenapps.logdb.query.LogQueryImpl; 34 41 import org.krakenapps.logdb.query.StringPlaceholder; 35 42 import org.krakenapps.logdb.query.command.Rpc; … … 40 47 import org.krakenapps.rpc.RpcConnectionProperties; 41 48 import org.krakenapps.rpc.RpcContext; 49 import org.krakenapps.rpc.RpcException; 42 50 import org.krakenapps.rpc.RpcMethod; 43 51 import org.krakenapps.rpc.RpcSession; … … 70 78 private String name; 71 79 80 /** 81 * mapreduce query guid to rpcfrom mappings 82 */ 72 83 private ConcurrentMap<String, Rpc> rpcFromMap; 84 85 /** 86 * mapreduce query guid to rpcto mappings 87 */ 73 88 private ConcurrentMap<String, Rpc> rpcToMap; 89 90 /** 91 * rpcfrom command parser 92 */ 74 93 private RpcFromParser rpcFromParser; 94 95 /** 96 * rpcto command parser 97 */ 75 98 private RpcToParser rpcToParser; 76 99 100 /** 101 * collected remote node's recent query statuses 102 */ 77 103 private ConcurrentMap<RemoteQueryKey, RemoteQuery> remoteQueries; 104 105 /** 106 * search node connections 107 */ 78 108 private ConcurrentMap<String, RpcConnection> upstreams; 109 110 /** 111 * connected from remote nodes. they push data source notifications, query 112 * status changes, and log data using separate data connection 113 */ 79 114 private ConcurrentMap<String, RpcConnection> downstreams; 115 116 /** 117 * mapreduce query guid to map query requests (from remote node) 118 */ 119 private ConcurrentMap<String, MapQuery> mapQueries; 120 121 /** 122 * mapreduce query guid to local waiting reduce queries 123 */ 124 private ConcurrentMap<String, ReduceQuery> reduceQueries; 125 126 /** 127 * remote map query to mapreduce query guid relation 128 */ 129 private ConcurrentMap<RemoteQueryKey, String> remoteQueryMappings; 80 130 81 131 public MapReduceRpcService() { … … 89 139 upstreams = new ConcurrentHashMap<String, RpcConnection>(); 90 140 downstreams = new ConcurrentHashMap<String, RpcConnection>(); 141 mapQueries = new ConcurrentHashMap<String, MapQuery>(); 142 reduceQueries = new ConcurrentHashMap<String, ReduceQuery>(); 91 143 remoteQueries = new ConcurrentHashMap<RemoteQueryKey, RemoteQuery>(); 144 remoteQueryMappings = new ConcurrentHashMap<RemoteQueryKey, String>(); 92 145 93 146 rpcFromParser = new RpcFromParser(); 147 rpcToParser = new RpcToParser(); 94 148 syntaxProvider.addParsers(Arrays.asList(rpcFromParser, rpcToParser)); 95 149 dataSourceRegistry.addListener(this); … … 116 170 public Object parse(Binding b) { 117 171 String guid = (String) b.getChildren()[1].getValue(); 118 Rpc rpc = new Rpc( guid, false);172 Rpc rpc = new Rpc(agent.getGuid(), null, guid, false); 119 173 rpcFromMap.put(guid, rpc); 120 174 return rpc; … … 132 186 public Object parse(Binding b) { 133 187 String guid = (String) b.getChildren()[1].getValue(); 134 Rpc rpc = new Rpc(guid, true); 188 MapQuery mq = mapQueries.get(guid); 189 Rpc rpc = new Rpc(agent.getGuid(), mq.getConnection(), guid, true); 135 190 rpcToMap.put(guid, rpc); 136 191 return rpc; … … 146 201 public Rpc getRpcTo(String guid) { 147 202 return rpcToMap.get(guid); 203 } 204 205 @RpcMethod(name = "setLogStream") 206 public void setLogStream(String guid) { 207 RpcSession session = RpcContext.getSession(); 208 session.setProperty("guid", guid); 209 } 210 211 @RpcMethod(name = "push") 212 public void push(Map<String, Object> data) { 213 RpcSession session = RpcContext.getSession(); 214 String queryGuid = (String) session.getProperty("guid"); 215 Rpc rpc = rpcFromMap.get(queryGuid); 216 rpc.push(data); 217 218 if (logger.isDebugEnabled()) { 219 String s = Primitive.stringify(data); 220 logger.debug("kraken logdb: pushed [{}] data [{}]", queryGuid, s); 221 } 222 } 223 224 @RpcMethod(name = "createMapQuery") 225 public int createMapQuery(String queryGuid, String query) { 226 try { 227 RpcSession session = RpcContext.getSession(); 228 String guid = session.getConnection().getPeerGuid(); 229 230 // map query should be set before rpc command parsing 231 MapQuery mq = new MapQuery(guid, session.getConnection()); 232 mapQueries.put(queryGuid, mq); 233 mq.setQuery(queryService.createQuery(query)); 234 235 logger.info("kraken logdb: created map query [{}]", queryGuid); 236 return mq.getQuery().getId(); 237 } catch (Exception e) { 238 logger.error("kraken logdb: cannot create map query", e); 239 throw new RpcException(e.getMessage()); 240 } 241 } 242 243 @RpcMethod(name = "startMapQuery") 244 public void startMapQuery(String queryGuid) { 245 MapQuery mq = mapQueries.get(queryGuid); 246 if (mq == null) 247 throw new RpcException("mapreduce query not found: " + queryGuid); 248 249 queryService.startQuery(mq.getQuery().getId()); 250 logger.info("kraken logdb: started map query [{}]", queryGuid); 251 } 252 253 @RpcMethod(name = "removeMapQuery") 254 public void removeMapQuery(String queryGuid) { 255 MapQuery mq = mapQueries.remove(queryGuid); 256 if (mq == null) 257 throw new RpcException("mapreduce query not found: " + queryGuid); 258 259 logger.info("kraken logdb: removed map query [{}]", queryGuid); 148 260 } 149 261 … … 168 280 RemoteQueryKey key = new RemoteQueryKey(guid, queryId); 169 281 LogQueryStatus status = LogQueryStatus.valueOf(action); 282 283 logger.info("kraken logdb: query status change [{}, status={}]", key, status); 170 284 171 285 switch (status) { … … 192 306 if (q != null) 193 307 q.setEnd(true); 308 309 // TODO: check if all mapper queries are ended 310 // for now, send eof if one mapper query is ended 311 String queryGuid = remoteQueryMappings.get(key); 312 Rpc rpc = rpcFromMap.get(queryGuid); 313 if (rpc != null) 314 rpc.eof(); 315 else 316 logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 194 317 } 195 318 break; … … 210 333 211 334 @Override 212 public MapReduceQueryStatus createQuery(String query) { 213 return null; 335 public MapReduceQueryStatus createQuery(String queryString) { 336 String queryGuid = UUID.randomUUID().toString(); 337 LogQuery lq = new LogQueryImpl(syntaxProvider, queryString); 338 339 boolean foundReducer = false; 340 List<LogQueryCommand> mapCommands = new ArrayList<LogQueryCommand>(); 341 List<LogQueryCommand> reduceCommands = new ArrayList<LogQueryCommand>(); 342 343 for (LogQueryCommand c : lq.getCommands()) { 344 if (c.isReducer()) 345 foundReducer = true; 346 347 if (foundReducer) 348 reduceCommands.add(c); 349 else 350 mapCommands.add(c); 351 } 352 353 String mapQueryString = buildQueryString(mapCommands); 354 String reduceQueryString = buildQueryString(reduceCommands); 355 356 mapQueryString = mapQueryString + "|rpcto " + queryGuid; 357 reduceQueryString = "rpcfrom " + queryGuid + "|" + reduceQueryString; 358 359 logger.info("kraken logdb: map query [{}]", mapQueryString); 360 logger.info("kraken logdb: reduce query [{}]", reduceQueryString); 361 362 // create map queries 363 List<RemoteMapQuery> mapQueries = new ArrayList<RemoteMapQuery>(); 364 for (RpcConnection c : downstreams.values()) { 365 RpcSession session = null; 366 try { 367 session = c.createSession("logdb-mapreduce"); 368 int id = (Integer) session.call("createMapQuery", queryGuid, mapQueryString); 369 mapQueries.add(new RemoteMapQuery(queryGuid, c.getPeerGuid(), id)); 370 remoteQueryMappings.put(new RemoteQueryKey(c.getPeerGuid(), id), queryGuid); 371 } catch (Exception e) { 372 logger.error("kraken logdb: cannot create mapquery", e); 373 } finally { 374 if (session != null) 375 session.close(); 376 } 377 } 378 379 // create and start reduce query 380 LogQuery q = queryService.createQuery(reduceQueryString); 381 ReduceQuery r = new ReduceQuery(queryGuid, q); 382 reduceQueries.put(queryGuid, r); 383 queryService.startQuery(q.getId()); 384 385 // start map queries (pumping) 386 for (RpcConnection c : downstreams.values()) { 387 RpcSession session = null; 388 try { 389 session = c.createSession("logdb-mapreduce"); 390 session.call("startMapQuery", queryGuid); 391 } catch (Exception e) { 392 logger.error("kraken logdb: cannot start mapquery", e); 393 } finally { 394 if (session != null) 395 session.close(); 396 } 397 } 398 399 return new MapReduceQueryStatus(queryGuid, queryString, mapQueries, r); 400 } 401 402 private String buildQueryString(List<LogQueryCommand> commands) { 403 StringBuilder sb = new StringBuilder(); 404 int i = 0; 405 for (LogQueryCommand c : commands) { 406 if (i++ != 0) 407 sb.append("|"); 408 409 sb.append(c.getQueryString()); 410 } 411 412 return sb.toString(); 214 413 } 215 414 … … 247 446 public void sessionOpened(RpcSessionEvent e) { 248 447 RpcConnection conn = e.getSession().getConnection(); 249 if (! downstreams.containsKey(conn.getPeerGuid())) {448 if (!upstreams.containsKey(conn.getPeerGuid()) && !downstreams.containsKey(conn.getPeerGuid())) { 250 449 downstreams.put(conn.getPeerGuid(), conn); 251 450 logger.info("kraken logdb: downstream connection [{}] opened", conn); … … 277 476 278 477 upstreams.put(conn.getPeerGuid(), conn); 478 conn.bind("logdb-mapreduce", this); 279 479 280 480 for (DataSource ds : dataSourceRegistry.getAll())
Note: See TracChangeset
for help on using the changeset viewer.
