Changeset 532:7057fc0c1459 for kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/Rpc.java
Legend:
- Unmodified
- Added
- Removed
-
kraken-logdb/src/main/java/org/krakenapps/logdb/query/command/Rpc.java
r526 r532 2 2 3 3 import java.util.Map; 4 4 import org.krakenapps.api.Primitive; 5 5 import org.krakenapps.logdb.LogQueryCommand; 6 import org.krakenapps.rpc.RpcClient; 7 import org.krakenapps.rpc.RpcConnection; 8 import org.krakenapps.rpc.RpcConnectionProperties; 9 import org.krakenapps.rpc.RpcException; 10 import org.krakenapps.rpc.RpcSession; 11 import org.slf4j.Logger; 12 import org.slf4j.LoggerFactory; 6 13 7 14 public class Rpc extends LogQueryCommand { 15 private final Logger logger = LoggerFactory.getLogger(Rpc.class.getName()); 16 17 private String agentGuid; 18 private RpcConnection upstream; 19 8 20 /** 9 21 * dist query guid … … 13 25 private boolean sender; 14 26 15 public Rpc(String guid, boolean sender) { 27 private RpcConnection datastream; 28 private RpcSession datasession; 29 30 public Rpc(String agentGuid, RpcConnection upstream, String guid, boolean sender) { 31 this.agentGuid = agentGuid; 32 this.upstream = upstream; 16 33 this.guid = guid; 17 34 this.sender = sender; … … 20 37 @Override 21 38 public void push(Map<String, Object> m) { 22 write(m); 39 if (sender) { 40 if (datastream == null) { 41 RpcClient client = new RpcClient(agentGuid); 42 RpcConnectionProperties props = new RpcConnectionProperties(upstream.getRemoteAddress()); 43 props.setPassword((String) upstream.getProperty("password")); 44 datastream = client.connect(props); 45 try { 46 datasession = datastream.createSession("logdb-mapreduce"); 47 datasession.call("setLogStream", guid); 48 } catch (RpcException e) { 49 logger.error("kraken logdb: cannot set log stream", e); 50 } catch (InterruptedException e) { 51 logger.error("kraken logdb: cannot set log stream", e); 52 } 53 54 logger.info("kraken logdb: opened rpc data stream for query guid [{}]", guid); 55 } 56 57 datasession.post("push", m); 58 } else { 59 write(m); 60 } 61 62 if (logger.isDebugEnabled()) 63 logger.debug("kraken logdb: rpc [{}]", Primitive.stringify(m)); 23 64 } 24 65 … … 34 75 35 76 @Override 77 public void eof() { 78 if (datasession != null) { 79 datastream.close(); 80 datastream = null; 81 datasession = null; 82 } 83 84 logger.info("kraken logdb: closed rpc data stream for query guid [{}]", guid); 85 super.eof(); 86 } 87 88 @Override 36 89 public String toString() { 37 90 return "RPC " + (sender ? "Output" : "Input") + guid;
Note: See TracChangeset
for help on using the changeset viewer.
