Changeset 532:7057fc0c1459
- Timestamp:
- 11/14/11 02:31:08 (3 months ago)
- Branch:
- default
- Location:
- kraken-logdb/src/main/java/org/krakenapps/logdb
- Files:
-
- 3 added
- 1 deleted
- 6 edited
-
impl/LogDBScript.java (modified) (1 diff)
-
impl/MapReduceRpcService.java (modified) (14 diffs)
-
mapreduce/MapQuery.java (added)
-
mapreduce/MapReduceQuery.java (deleted)
-
mapreduce/MapReduceQueryStatus.java (modified) (2 diffs)
-
mapreduce/ReduceQuery.java (added)
-
mapreduce/RemoteMapQuery.java (added)
-
query/LogQueryImpl.java (modified) (2 diffs)
-
query/LogQueryServiceImpl.java (modified) (1 diff)
-
query/command/Rpc.java (modified) (4 diffs)
Legend:
- Unmodified
- Added
- Removed
-
kraken-logdb/src/main/java/org/krakenapps/logdb/impl/LogDBScript.java
r530 r532 215 215 216 216 context.println(q); 217 218 LogQuery lq = q.getReduceQuery().getQuery(); 219 do { 220 try { 221 Thread.sleep(100); 222 } catch (InterruptedException e) { 223 } 224 } while (!lq.isEnd()); 225 226 List<Map<String, Object>> results = lq.getResult(); 227 for (Map<String, Object> m : results) 228 printMap(m); 229 ((FileBufferList<Map<String, Object>>) results).close(); 230 231 qs.removeQuery(lq.getId()); 217 232 } 218 233 -
kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java
r530 r532 9 9 import java.util.List; 10 10 import java.util.Map; 11 import java.util.UUID; 11 12 import java.util.concurrent.ConcurrentHashMap; 12 13 import java.util.concurrent.ConcurrentMap; … … 18 19 import org.apache.felix.ipojo.annotations.ServiceProperty; 19 20 import org.apache.felix.ipojo.annotations.Validate; 21 import org.krakenapps.api.Primitive; 20 22 import org.krakenapps.bnf.Binding; 21 23 import org.krakenapps.bnf.Syntax; … … 24 26 import org.krakenapps.logdb.DataSourceRegistry; 25 27 import org.krakenapps.logdb.LogQuery; 28 import org.krakenapps.logdb.LogQueryCommand; 26 29 import org.krakenapps.logdb.LogQueryEventListener; 27 30 import org.krakenapps.logdb.LogQueryService; 28 31 import org.krakenapps.logdb.LogQueryStatus; 29 32 import org.krakenapps.logdb.SyntaxProvider; 33 import org.krakenapps.logdb.mapreduce.MapQuery; 30 34 import org.krakenapps.logdb.mapreduce.MapReduceQueryStatus; 31 35 import org.krakenapps.logdb.mapreduce.MapReduceService; 36 import org.krakenapps.logdb.mapreduce.ReduceQuery; 37 import org.krakenapps.logdb.mapreduce.RemoteMapQuery; 32 38 import org.krakenapps.logdb.mapreduce.RemoteQuery; 33 39 import org.krakenapps.logdb.mapreduce.RemoteQueryKey; 40 import org.krakenapps.logdb.query.LogQueryImpl; 34 41 import org.krakenapps.logdb.query.StringPlaceholder; 35 42 import org.krakenapps.logdb.query.command.Rpc; … … 40 47 import org.krakenapps.rpc.RpcConnectionProperties; 41 48 import org.krakenapps.rpc.RpcContext; 49 import org.krakenapps.rpc.RpcException; 42 50 import org.krakenapps.rpc.RpcMethod; 43 51 import org.krakenapps.rpc.RpcSession; … … 70 78 private String name; 71 79 80 /** 81 * mapreduce query guid to rpcfrom mappings 82 */ 72 83 private ConcurrentMap<String, Rpc> rpcFromMap; 84 85 /** 86 * mapreduce query guid to rpcto mappings 87 */ 73 88 private ConcurrentMap<String, Rpc> rpcToMap; 89 90 /** 91 * rpcfrom command parser 92 */ 74 93 private RpcFromParser rpcFromParser; 94 95 /** 96 * rpcto command parser 97 */ 75 98 private RpcToParser rpcToParser; 76 99 100 /** 101 * collected remote node's recent query statuses 102 */ 77 103 private ConcurrentMap<RemoteQueryKey, RemoteQuery> remoteQueries; 104 105 /** 106 * search node connections 107 */ 78 108 private ConcurrentMap<String, RpcConnection> upstreams; 109 110 /** 111 * connected from remote nodes. they push data source notifications, query 112 * status changes, and log data using separate data connection 113 */ 79 114 private ConcurrentMap<String, RpcConnection> downstreams; 115 116 /** 117 * mapreduce query guid to map query requests (from remote node) 118 */ 119 private ConcurrentMap<String, MapQuery> mapQueries; 120 121 /** 122 * mapreduce query guid to local waiting reduce queries 123 */ 124 private ConcurrentMap<String, ReduceQuery> reduceQueries; 125 126 /** 127 * remote map query to mapreduce query guid relation 128 */ 129 private ConcurrentMap<RemoteQueryKey, String> remoteQueryMappings; 80 130 81 131 public MapReduceRpcService() { … … 89 139 upstreams = new ConcurrentHashMap<String, RpcConnection>(); 90 140 downstreams = new ConcurrentHashMap<String, RpcConnection>(); 141 mapQueries = new ConcurrentHashMap<String, MapQuery>(); 142 reduceQueries = new ConcurrentHashMap<String, ReduceQuery>(); 91 143 remoteQueries = new ConcurrentHashMap<RemoteQueryKey, RemoteQuery>(); 144 remoteQueryMappings = new ConcurrentHashMap<RemoteQueryKey, String>(); 92 145 93 146 rpcFromParser = new RpcFromParser(); 147 rpcToParser = new RpcToParser(); 94 148 syntaxProvider.addParsers(Arrays.asList(rpcFromParser, rpcToParser)); 95 149 dataSourceRegistry.addListener(this); … … 116 170 public Object parse(Binding b) { 117 171 String guid = (String) b.getChildren()[1].getValue(); 118 Rpc rpc = new Rpc( guid, false);172 Rpc rpc = new Rpc(agent.getGuid(), null, guid, false); 119 173 rpcFromMap.put(guid, rpc); 120 174 return rpc; … … 132 186 public Object parse(Binding b) { 133 187 String guid = (String) b.getChildren()[1].getValue(); 134 Rpc rpc = new Rpc(guid, true); 188 MapQuery mq = mapQueries.get(guid); 189 Rpc rpc = new Rpc(agent.getGuid(), mq.getConnection(), guid, true); 135 190 rpcToMap.put(guid, rpc); 136 191 return rpc; … … 146 201 public Rpc getRpcTo(String guid) { 147 202 return rpcToMap.get(guid); 203 } 204 205 @RpcMethod(name = "setLogStream") 206 public void setLogStream(String guid) { 207 RpcSession session = RpcContext.getSession(); 208 session.setProperty("guid", guid); 209 } 210 211 @RpcMethod(name = "push") 212 public void push(Map<String, Object> data) { 213 RpcSession session = RpcContext.getSession(); 214 String queryGuid = (String) session.getProperty("guid"); 215 Rpc rpc = rpcFromMap.get(queryGuid); 216 rpc.push(data); 217 218 if (logger.isDebugEnabled()) { 219 String s = Primitive.stringify(data); 220 logger.debug("kraken logdb: pushed [{}] data [{}]", queryGuid, s); 221 } 222 } 223 224 @RpcMethod(name = "createMapQuery") 225 public int createMapQuery(String queryGuid, String query) { 226 try { 227 RpcSession session = RpcContext.getSession(); 228 String guid = session.getConnection().getPeerGuid(); 229 230 // map query should be set before rpc command parsing 231 MapQuery mq = new MapQuery(guid, session.getConnection()); 232 mapQueries.put(queryGuid, mq); 233 mq.setQuery(queryService.createQuery(query)); 234 235 logger.info("kraken logdb: created map query [{}]", queryGuid); 236 return mq.getQuery().getId(); 237 } catch (Exception e) { 238 logger.error("kraken logdb: cannot create map query", e); 239 throw new RpcException(e.getMessage()); 240 } 241 } 242 243 @RpcMethod(name = "startMapQuery") 244 public void startMapQuery(String queryGuid) { 245 MapQuery mq = mapQueries.get(queryGuid); 246 if (mq == null) 247 throw new RpcException("mapreduce query not found: " + queryGuid); 248 249 queryService.startQuery(mq.getQuery().getId()); 250 logger.info("kraken logdb: started map query [{}]", queryGuid); 251 } 252 253 @RpcMethod(name = "removeMapQuery") 254 public void removeMapQuery(String queryGuid) { 255 MapQuery mq = mapQueries.remove(queryGuid); 256 if (mq == null) 257 throw new RpcException("mapreduce query not found: " + queryGuid); 258 259 logger.info("kraken logdb: removed map query [{}]", queryGuid); 148 260 } 149 261 … … 168 280 RemoteQueryKey key = new RemoteQueryKey(guid, queryId); 169 281 LogQueryStatus status = LogQueryStatus.valueOf(action); 282 283 logger.info("kraken logdb: query status change [{}, status={}]", key, status); 170 284 171 285 switch (status) { … … 192 306 if (q != null) 193 307 q.setEnd(true); 308 309 // TODO: check if all mapper queries are ended 310 // for now, send eof if one mapper query is ended 311 String queryGuid = remoteQueryMappings.get(key); 312 Rpc rpc = rpcFromMap.get(queryGuid); 313 if (rpc != null) 314 rpc.eof(); 315 else 316 logger.warn("kraken logdb: rpcfrom not found for mapreduce query [{}]", queryGuid); 194 317 } 195 318 break; … … 210 333 211 334 @Override 212 public MapReduceQueryStatus createQuery(String query) { 213 return null; 335 public MapReduceQueryStatus createQuery(String queryString) { 336 String queryGuid = UUID.randomUUID().toString(); 337 LogQuery lq = new LogQueryImpl(syntaxProvider, queryString); 338 339 boolean foundReducer = false; 340 List<LogQueryCommand> mapCommands = new ArrayList<LogQueryCommand>(); 341 List<LogQueryCommand> reduceCommands = new ArrayList<LogQueryCommand>(); 342 343 for (LogQueryCommand c : lq.getCommands()) { 344 if (c.isReducer()) 345 foundReducer = true; 346 347 if (foundReducer) 348 reduceCommands.add(c); 349 else 350 mapCommands.add(c); 351 } 352 353 String mapQueryString = buildQueryString(mapCommands); 354 String reduceQueryString = buildQueryString(reduceCommands); 355 356 mapQueryString = mapQueryString + "|rpcto " + queryGuid; 357 reduceQueryString = "rpcfrom " + queryGuid + "|" + reduceQueryString; 358 359 logger.info("kraken logdb: map query [{}]", mapQueryString); 360 logger.info("kraken logdb: reduce query [{}]", reduceQueryString); 361 362 // create map queries 363 List<RemoteMapQuery> mapQueries = new ArrayList<RemoteMapQuery>(); 364 for (RpcConnection c : downstreams.values()) { 365 RpcSession session = null; 366 try { 367 session = c.createSession("logdb-mapreduce"); 368 int id = (Integer) session.call("createMapQuery", queryGuid, mapQueryString); 369 mapQueries.add(new RemoteMapQuery(queryGuid, c.getPeerGuid(), id)); 370 remoteQueryMappings.put(new RemoteQueryKey(c.getPeerGuid(), id), queryGuid); 371 } catch (Exception e) { 372 logger.error("kraken logdb: cannot create mapquery", e); 373 } finally { 374 if (session != null) 375 session.close(); 376 } 377 } 378 379 // create and start reduce query 380 LogQuery q = queryService.createQuery(reduceQueryString); 381 ReduceQuery r = new ReduceQuery(queryGuid, q); 382 reduceQueries.put(queryGuid, r); 383 queryService.startQuery(q.getId()); 384 385 // start map queries (pumping) 386 for (RpcConnection c : downstreams.values()) { 387 RpcSession session = null; 388 try { 389 session = c.createSession("logdb-mapreduce"); 390 session.call("startMapQuery", queryGuid); 391 } catch (Exception e) { 392 logger.error("kraken logdb: cannot start mapquery", e); 393 } finally { 394 if (session != null) 395 session.close(); 396 } 397 } 398 399 return new MapReduceQueryStatus(queryGuid, queryString, mapQueries, r); 400 } 401 402 private String buildQueryString(List<LogQueryCommand> commands) { 403 StringBuilder sb = new StringBuilder(); 404 int i = 0; 405 for (LogQueryCommand c : commands) { 406 if (i++ != 0) 407 sb.append("|"); 408 409 sb.append(c.getQueryString()); 410 } 411 412 return sb.toString(); 214 413 } 215 414 … … 247 446 public void sessionOpened(RpcSessionEvent e) { 248 447 RpcConnection conn = e.getSession().getConnection(); 249 if (! downstreams.containsKey(conn.getPeerGuid())) {448 if (!upstreams.containsKey(conn.getPeerGuid()) && !downstreams.containsKey(conn.getPeerGuid())) { 250 449 downstreams.put(conn.getPeerGuid(), conn); 251 450 logger.info("kraken logdb: downstream connection [{}] opened", conn); … … 277 476 278 477 upstreams.put(conn.getPeerGuid(), conn); 478 conn.bind("logdb-mapreduce", this); 279 479 280 480 for (DataSource ds : dataSourceRegistry.getAll()) -
kraken-logdb/src/main/java/org/krakenapps/logdb/mapreduce/MapReduceQueryStatus.java
r529 r532 5 5 public class MapReduceQueryStatus { 6 6 private String guid; 7 private String owner;8 7 private String query; 9 private List< MapReduceQuery> mapperQueries;10 private MapReduceQuery reducerQuery;8 private List<RemoteMapQuery> mapQueries; 9 private ReduceQuery reduceQuery; 11 10 12 public MapReduceQueryStatus(String guid, String owner, String query) {11 public MapReduceQueryStatus(String guid, String query, List<RemoteMapQuery> mapQueries, ReduceQuery reduceQuery) { 13 12 this.guid = guid; 14 this.owner = owner;15 13 this.query = query; 14 this.mapQueries = mapQueries; 15 this.reduceQuery = reduceQuery; 16 16 } 17 17 … … 20 20 } 21 21 22 public String getOwner() {23 return owner;24 }25 26 22 public String getQuery() { 27 23 return query; 28 24 } 29 25 30 public List< MapReduceQuery> getMapperQueries() {31 return map perQueries;26 public List<RemoteMapQuery> getMapQueries() { 27 return mapQueries; 32 28 } 33 29 34 public void setMap perQueries(List<MapReduceQuery> mapperQueries) {35 this.map perQueries = mapperQueries;30 public void setMapQueries(List<RemoteMapQuery> mapQueries) { 31 this.mapQueries = mapQueries; 36 32 } 37 33 38 public MapReduceQuery getReducerQuery() {39 return reduce rQuery;34 public ReduceQuery getReduceQuery() { 35 return reduceQuery; 40 36 } 41 37 42 public void setReduce rQuery(MapReduceQuery reducerQuery) {43 this.reduce rQuery = reducerQuery;38 public void setReduceQuery(ReduceQuery reduceQuery) { 39 this.reduceQuery = reduceQuery; 44 40 } 45 41 46 42 @Override 47 43 public String toString() { 48 return " arbiter query status: guid=" + guid + ", owner=" + owner + "query=" + query;44 return "mapreduce query status: guid=" + guid + ", query=" + query; 49 45 } 50 46 -
kraken-logdb/src/main/java/org/krakenapps/logdb/query/LogQueryImpl.java
r457 r532 30 30 import org.krakenapps.logdb.LogQueryCallback; 31 31 import org.krakenapps.logdb.LogQueryCommand; 32 import org.krakenapps.logdb.LogQueryService;33 32 import org.krakenapps.logdb.SyntaxProvider; 34 import org.krakenapps.logstorage.LogStorage;35 import org.krakenapps.logstorage.LogTableRegistry;36 33 import org.krakenapps.logdb.LogQueryCommand.Status; 37 34 import org.krakenapps.logdb.LogTimelineCallback; … … 51 48 private Set<LogTimelineCallback> timelineCallbacks = new HashSet<LogTimelineCallback>(); 52 49 53 public LogQueryImpl(SyntaxProvider syntaxProvider, LogQueryService service, LogStorage logStorage, 54 LogTableRegistry tableRegistry, String query) { 55 this.queryString = query; 50 public LogQueryImpl(SyntaxProvider syntaxProvider, String queryString) { 51 this.queryString = queryString; 56 52 57 53 for (String q : queryString.split("\\|")) { -
kraken-logdb/src/main/java/org/krakenapps/logdb/query/LogQueryServiceImpl.java
r529 r532 109 109 @Override 110 110 public LogQuery createQuery(String query) { 111 LogQuery lq = new LogQueryImpl(syntaxProvider, this, logStorage, tableRegistry,query);111 LogQuery lq = new LogQueryImpl(syntaxProvider, query); 112 112 queries.put(lq.getId(), lq); 113 113 lq.registerQueryCallback(new EofReceiver(lq)); -
kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/Rpc.java
r526 r532 2 2 3 3 import java.util.Map; 4 4 import org.krakenapps.api.Primitive; 5 5 import org.krakenapps.logdb.LogQueryCommand; 6 import org.krakenapps.rpc.RpcClient; 7 import org.krakenapps.rpc.RpcConnection; 8 import org.krakenapps.rpc.RpcConnectionProperties; 9 import org.krakenapps.rpc.RpcException; 10 import org.krakenapps.rpc.RpcSession; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 6 13 7 14 public class Rpc extends LogQueryCommand { 15 private final Logger logger = LoggerFactory.getLogger(Rpc.class.getName()); 16 17 private String agentGuid; 18 private RpcConnection upstream; 19 8 20 /** 9 21 * dist query guid … … 13 25 private boolean sender; 14 26 15 public Rpc(String guid, boolean sender) { 27 private RpcConnection datastream; 28 private RpcSession datasession; 29 30 public Rpc(String agentGuid, RpcConnection upstream, String guid, boolean sender) { 31 this.agentGuid = agentGuid; 32 this.upstream = upstream; 16 33 this.guid = guid; 17 34 this.sender = sender; … … 20 37 @Override 21 38 public void push(Map<String, Object> m) { 22 write(m); 39 if (sender) { 40 if (datastream == null) { 41 RpcClient client = new RpcClient(agentGuid); 42 RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 43 props.setPassword((String) upstream.getProperty("password")); 44 datastream = client.connect(props); 45 try { 46 datasession = datastream.createSession("logdb-mapreduce"); 47 datasession.call("setLogStream", guid); 48 } catch (RpcException e) { 49 logger.error("kraken logdb: cannot set log stream", e); 50 } catch (InterruptedException e) { 51 logger.error("kraken logdb: cannot set log stream", e); 52 } 53 54 logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 55 } 56 57 datasession.post("push", m); 58 } else { 59 write(m); 60 } 61 62 if (logger.isDebugEnabled()) 63 logger.debug("kraken logdb: rpc [{}]", Primitive.stringify(m)); 23 64 } 24 65 … … 34 75 35 76 @Override 77 public void eof() { 78 if (datasession != null) { 79 datastream.close(); 80 datastream = null; 81 datasession = null; 82 } 83 84 logger.info("kraken logdb: closed rpc data stream for query guid [{}]", guid); 85 super.eof(); 86 } 87 88 @Override 36 89 public String toString() { 37 90 return "RPC " + (sender ? "Output" : "Input") + guid;
Note: See TracChangeset
for help on using the changeset viewer.
