DBTCPConnecror 是对 DBPort 类的封装,借助 DBPort 实现读写操作、获取服务器状态等。
say 方法和 call 方法
DBTCPConnecror 类中比较值得分析的是 say 方法和 call 方法的实现:
// 执行写操作
WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
// 执行读操作
Response call( DB db , DBCollection coll , OutMessage m , ServerAddress hostNeeded , int retries )
这两个方法的实现方式相似,实际上都是通过 DBPort 实现,同时增加了检查连接和错误处理的代码。
这里仅以 say 方法的实现作为例子进行说明:
public WriteResult say( DB db , OutMessage m , WriteConcern concern , ServerAddress hostNeeded )
throws MongoException {
// 检查数据库连接
_checkClosed();
checkMaster( false , true );
// 获取数据连接端口
MyPort mp = _myPort.get();
DBPort port = mp.get( true , false , hostNeeded );
try {
// 检查权限
port.checkAuth( db );
// 通过 DBPort 实现写操作
port.say( m );
// 检查错误或返回正确的结果
if ( concern.callGetLastError() ){
return _checkWriteError( db , mp , port , concern );
}
else {
return new WriteResult( db , port , concern );
}
}
catch (...){
// ...
}
finally {
// 结束操作
mp.done( port );
m.doneWithMessage();
}
}
say 和 call 方式是借助 DBPort 实现的,而 DBPort 对象是通过内部类 DBTCPConnector.MyPort 的 get 方法获取的:
// 获取数据库端口
DBPort get( boolean keep , boolean slaveOk , ServerAddress hostNeeded ){
// 如果指定了服务器,就获取指定服务器的端口
if ( hostNeeded != null ){
// asked for a specific host
return _portHolder.get(hostNeeded ).get();
}
// 在一个请求中,如果已经使用了一个端口,则继续使用它
if ( _requestPort != null ){
if ( _requestPort.getPool() == _masterPortPool || !keep ) {
return _requestPort;
}
_requestPort.getPool().done(_requestPort);
_requestPort = null;
}
// 集群部署,可以从 Slave 获取端口
if ( slaveOk && _rsStatus != null ){
// if slaveOk, try to use a secondary
ServerAddress slave = _rsStatus.getASecondary();
if ( slave != null ){
return _portHolder.get( slave ).get();
}
}
// master 端口池为空,抛异常
if (_masterPortPool == null) {
throw new MongoException("Rare case where master=null, probably all servers are down");
}
// 通过 master 端口池获取端口
DBPort p = _masterPortPool.get();
if ( keep && _inRequest ) {
_requestPort = p;
}
return p;
}
其他方法
getServerAddressList (获取服务器地址列表),checkMaster(检查 master 服务器)和 fetchMaxBsonObjectSize (获取 maxBsonObjectSize) 也有一定的分析价值:
// 获取服务器地址列表
public List<ServerAddress> getServerAddressList() {
// 如果是集群方式,则通过 ReplicaSetStatus 返回地址列表
if (_rsStatus != null) {
return _rsStatus.getServerAddressList();
}
// 否则,返回 master 服务器的地址
ServerAddress master = getAddress();
if (master != null) {
List<ServerAddress> list = new ArrayList<ServerAddress>();
list.add(master);
return list;
}
return null;
}
// 检查 master 服务器
void checkMaster( boolean force , boolean failIfNoMaster )
throws MongoException {
// 检查是集群部署还是单机部署
if ( _rsStatus != null ){
if ( _masterPortPool == null || force ){
// 集群部署
// 通过 ReplicaSetStatus 获取 master 节点
ReplicaSetStatus.Node n = _rsStatus.ensureMaster();
if ( n == null ){
if ( failIfNoMaster )
throw new MongoException( "can't find a master" );
}
else {
// 根据 master 节点的信息设置 DBTCPConnector 的属性
_set( n._addr );
maxBsonObjectSize = _rsStatus.getMaxBsonObjectSize();
}
}
} else {
// 单机部署
// 根据服务器信息设置 maxBsonObjectSize
if (maxBsonObjectSize == 0)
maxBsonObjectSize = fetchMaxBsonObjectSize();
}
}
// 获取并设置 maxBsonObjectSize
int fetchMaxBsonObjectSize() {
if (_masterPortPool == null)
return 0;
DBPort port = _masterPortPool.get();
try {
// 连接 admin 数据库,执行检查 master 服务器的命令
CommandResult res = port.runCommand(_mongo.getDB("admin"), new BasicDBObject("isMaster", 1));
// 1.8 之后的版本返回的结果中包含 maxBsonObjectSize
if (res.containsField("maxBsonObjectSize")) {
maxBsonObjectSize = ((Integer) res.get("maxBsonObjectSize")).intValue();
} else {
maxBsonObjectSize = Bytes.MAX_OBJECT_SIZE;
}
} catch (Exception e) {
_logger.log(Level.WARNING, null, e);
} finally {
port.getPool().done(port);
}
return maxBsonObjectSize;
}