com.taobao.remoting.impl.DefaultConnection.java // 用来存放请求和回调的MAP private final ConcurrentHashMap<Long, Object[]> requestResidents; //发送消息出去 void sendRequestWithCallback(ConnectionRequest connRequest, ResponseCallback callback, long timeoutMs) { long requestId = connRequest.getId(); long waitBegin = System.currentTimeMillis(); long waitEnd = waitBegin + timeoutMs; Object[] queue = new Object[4]; int idx = 0; queue[idx++] = waitEnd; queue[idx++] = waitBegin; //用于记录日志 queue[idx++] = connRequest; //用于记录日志 queue[idx++] = callback; requestResidents.put(requestId, queue); // 记录响应队列 write(connRequest); // 埋点记录等待响应的Map的大小 StatLog.addStat("TBRemoting-ResponseQueues", "size", requestResidents.size(), 1L); } public void write(final Object connectionMsg) { //mina里的IoSession.write()发送消息 WriteFuture writeFuture = ioSession.write(connectionMsg); // 注册FutureListener,当请求发送失败后,能够立即做出响应 writeFuture.addListener(new MsgWrittenListener(this, connectionMsg)); } /** * 在得到响应后,删除对应的请求队列,并执行回调 * 调用者:MINA线程 */ public void putResponse(final ConnectionResponse connResp) { final long requestId = connResp.getRequestId(); Object[] queue = requestResidents.remove(requestId); if (null == queue) { Object appResp = connResp.getAppResponse(); String appRespClazz = (null == appResp) ? "null" : appResp.getClass().getName(); StringBuilder sb = new StringBuilder(); sb.append("Not found response receiver for requestId=[").append(requestId).append("],"); sb.append("from [").append(connResp.getHost()).append("],"); sb.append("response type [").append(appRespClazz).append("]."); LOGGER.warn(sb.toString()); return; } int idx = 0; idx++; long waitBegin = (Long) queue[idx++]; ConnectionRequest connRequest = (ConnectionRequest) queue[idx++]; ResponseCallback callback = (ResponseCallback) queue[idx++]; // ** 把回调任务交给业务提供的线程池执行 ** Executor callbackExecutor = callback.getExecutor(); callbackExecutor.execute(new CallbackExecutorTask(connResp, callback)); long duration = System.currentTimeMillis() - waitBegin; // 实际读响应时间 logIfResponseError(connResp, duration, connRequest.getAppRequest()); } |