Changeset 537:d8d85fe7264c
- Timestamp:
- 11/14/11 14:56:36 (6 months ago)
- Branch:
- default
- Location:
- kraken-logdb/src/main/java/org/krakenapps/logdb
- Files:
-
- 1 added
- 3 edited
- 1 moved
-
impl/LogDBScript.java (modified) (3 diffs)
-
impl/MapReduceRpcService.java (modified) (8 diffs)
-
mapreduce/MapReduceService.java (modified) (1 diff)
-
query/command/RpcFrom.java (added)
-
query/command/RpcTo.java (moved) (moved from kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/Rpc.java) (3 diffs)
Legend:
- Unmodified
- Added
- Removed
-
kraken-logdb/src/main/java/org/krakenapps/logdb/impl/LogDBScript.java
r532 r537 39 39 import org.krakenapps.logdb.mapreduce.RemoteQuery; 40 40 import org.krakenapps.logdb.query.FileBufferList; 41 import org.krakenapps.logdb.query.command.Rpc; 41 import org.krakenapps.logdb.query.command.RpcFrom; 42 import org.krakenapps.logdb.query.command.RpcTo; 42 43 import org.krakenapps.rpc.RpcConnection; 43 44 import org.krakenapps.rpc.RpcConnectionProperties; … … 236 237 @ScriptArgument(name = "sample string", type = "string", description = "sample string") }) 237 238 public void rpcfrom(String[] args) { 238 Rpc rpc = mapreduce.getRpcFrom(args[0]);239 RpcFrom rpc = mapreduce.getRpcFrom(args[0]); 239 240 if (rpc == null) { 240 241 context.println("rpc not found"); … … 249 250 @ScriptUsage(description = "eof to rpcfrom", arguments = { @ScriptArgument(name = "guid", type = "string", description = "dist query guid") }) 250 251 public void rpceof(String[] args) { 251 Rpc rpc = mapreduce.getRpcFrom(args[0]);252 RpcFrom rpc = mapreduce.getRpcFrom(args[0]); 252 253 if (rpc == null) { 253 254 context.println("rpc not found"); -
kraken-logdb/src/main/java/org/krakenapps/logdb/impl/MapReduceRpcService.java
r534 r537 40 40 import org.krakenapps.logdb.query.LogQueryImpl; 41 41 import org.krakenapps.logdb.query.StringPlaceholder; 42 import org.krakenapps.logdb.query.command.Rpc; 42 import org.krakenapps.logdb.query.command.RpcFrom; 43 import org.krakenapps.logdb.query.command.RpcTo; 43 44 import org.krakenapps.logdb.query.parser.QueryParser; 44 45 import org.krakenapps.rpc.RpcAgent; … … 81 82 * mapreduce query guid to rpcfrom mappings 82 83 */ 83 private ConcurrentMap<String, Rpc > rpcFromMap;84 private ConcurrentMap<String, RpcFrom> rpcFromMap; 84 85 85 86 /** 86 87 * mapreduce query guid to rpcto mappings 87 88 */ 88 private ConcurrentMap<String, Rpc > rpcToMap;89 private ConcurrentMap<String, RpcTo> rpcToMap; 89 90 90 91 /** … … 135 136 public void start() { 136 137 queries = new ConcurrentHashMap<String, MapReduceQueryStatus>(); 137 rpcFromMap = new ConcurrentHashMap<String, Rpc >();138 rpcToMap = new ConcurrentHashMap<String, Rpc >();138 rpcFromMap = new ConcurrentHashMap<String, RpcFrom>(); 139 rpcToMap = new ConcurrentHashMap<String, RpcTo>(); 139 140 upstreams = new ConcurrentHashMap<String, RpcConnection>(); 140 141 downstreams = new ConcurrentHashMap<String, RpcConnection>(); … … 170 171 public Object parse(Binding b) { 171 172 String guid = (String) b.getChildren()[1].getValue(); 172 Rpc rpc = new Rpc(agent.getGuid(), null, guid, false);173 RpcFrom rpc = new RpcFrom(guid); 173 174 rpcFromMap.put(guid, rpc); 174 175 return rpc; … … 187 188 String guid = (String) b.getChildren()[1].getValue(); 188 189 MapQuery mq = mapQueries.get(guid); 189 Rpc rpc = new Rpc(agent.getGuid(), mq.getConnection(), guid, true);190 RpcTo rpc = new RpcTo(agent.getGuid(), mq.getConnection(), guid); 190 191 rpcToMap.put(guid, rpc); 191 192 return rpc; … … 194 195 195 196 @Override 196 public Rpc getRpcFrom(String guid) {197 public RpcFrom getRpcFrom(String guid) { 197 198 return rpcFromMap.get(guid); 198 199 } 199 200 200 201 @Override 201 public Rpc getRpcTo(String guid) {202 public RpcTo getRpcTo(String guid) { 202 203 return rpcToMap.get(guid); 203 204 } … … 213 214 RpcSession session = RpcContext.getSession(); 214 215 String queryGuid = (String) session.getProperty("guid"); 215 Rpc rpc = rpcFromMap.get(queryGuid);216 RpcFrom rpc = rpcFromMap.get(queryGuid); 216 217 rpc.push(data); 217 218 … … 229 230 String nodeGuid = session.getConnection().getPeerGuid(); 230 231 231 Rpc rpc = rpcFromMap.get(queryGuid);232 RpcFrom rpc = rpcFromMap.get(queryGuid); 232 233 if (rpc != null) 233 234 rpc.eof(); -
kraken-logdb/src/main/java/org/krakenapps/logdb/mapreduce/MapReduceService.java
r530 r537 4 4 import java.util.List; 5 5 6 import org.krakenapps.logdb.query.command.Rpc; 6 import org.krakenapps.logdb.query.command.RpcFrom; 7 import org.krakenapps.logdb.query.command.RpcTo; 7 8 import org.krakenapps.rpc.RpcConnection; 8 9 import org.krakenapps.rpc.RpcConnectionProperties; 9 10 10 11 public interface MapReduceService { 11 Rpc getRpcFrom(String guid);12 RpcFrom getRpcFrom(String guid); 12 13 13 Rpc getRpcTo(String guid);14 RpcTo getRpcTo(String guid); 14 15 15 16 List<MapReduceQueryStatus> getQueries(); -
kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/RpcTo.java
r534 r537 12 12 import org.slf4j.LoggerFactory; 13 13 14 public class Rpc extends LogQueryCommand {15 private final Logger logger = LoggerFactory.getLogger(Rpc .class.getName());14 public class RpcTo extends LogQueryCommand { 15 private final Logger logger = LoggerFactory.getLogger(RpcTo.class.getName()); 16 16 17 17 private String agentGuid; … … 23 23 private String guid; 24 24 25 private boolean sender; 26 25 private RpcClient client; 27 26 private RpcConnection datastream; 28 27 private RpcSession datasession; 29 private boolean end;30 28 31 public Rpc (String agentGuid, RpcConnection upstream, String guid, boolean sender) {29 public RpcTo(String agentGuid, RpcConnection upstream, String guid) { 32 30 this.agentGuid = agentGuid; 33 31 this.upstream = upstream; 34 32 this.guid = guid; 35 this.sender = sender;36 33 } 37 34 38 35 @Override 39 36 public void push(Map<String, Object> m) { 40 if (end) { 41 logger.info("kraken logdb: loss (will be fixed) - {}", Primitive.stringify(m)); 42 return; 37 38 if (datastream == null) { 39 client = new RpcClient(agentGuid); 40 RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 41 props.setPassword((String) upstream.getProperty("password")); 42 datastream = client.connect(props); 43 try { 44 datasession = datastream.createSession("logdb-mapreduce"); 45 datasession.call("setLogStream", guid); 46 } catch (RpcException e) { 47 logger.error("kraken logdb: cannot set log stream", e); 48 } catch (InterruptedException e) { 49 logger.error("kraken logdb: cannot set log stream", e); 50 } 51 52 logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 43 53 } 44 54 45 if (sender) { 46 if (datastream == null) { 47 RpcClient client = new RpcClient(agentGuid); 48 RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 49 props.setPassword((String) upstream.getProperty("password")); 50 datastream = client.connect(props); 51 try { 52 datasession = datastream.createSession("logdb-mapreduce"); 53 datasession.call("setLogStream", guid); 54 } catch (RpcException e) { 55 logger.error("kraken logdb: cannot set log stream", e); 56 } catch (InterruptedException e) { 57 logger.error("kraken logdb: cannot set log stream", e); 58 } 59 60 logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 61 } 62 63 datasession.post("push", m); 64 } else { 65 write(m); 66 } 55 datasession.post("push", m); 67 56 68 57 if (logger.isDebugEnabled()) 69 logger.debug("kraken logdb: rpc [{}]", Primitive.stringify(m));58 logger.debug("kraken logdb: rpc mapper [{}]", Primitive.stringify(m)); 70 59 } 71 60 … … 89 78 } 90 79 80 datasession.close(); 91 81 datastream.close(); 82 client.close(); 83 client = null; 92 84 datastream = null; 93 85 datasession = null; 94 86 } 95 87 96 logger.info("kraken logdb: closed rpc datastream for query guid [{}]", guid);88 logger.info("kraken logdb: closed rpc mapper stream for query guid [{}]", guid); 97 89 super.eof(); 98 end = true;99 90 } 100 91 101 92 @Override 102 93 public String toString() { 103 return "RPC " + (sender ? "Output" : "Input") + guid;94 return "RPC Mapper [" + guid + "]"; 104 95 } 105 106 96 }
Note: See TracChangeset
for help on using the changeset viewer.
