随笔-159  评论-114  文章-7  trackbacks-0
最近在funplus做游戏,进而研究了一个新型架构。

之前做游戏都是自己使用java搭建架构,经过几年的积累确实也达到了最初的设想,多进程,进程内多线程,无锁,0延迟纯jdbc写库。对于单服架构来说,已经趋近于极致。

今年小游戏盛行,如海盗来了,疯狂游戏那家公司,全部使用的都是go+mongodb实现的,因为go的语言级别支援高并发,这点是java无法比拟的。不过java开源项目多,有很多的高手铺垫了超多的框架,比如vertx,akka都可以更加充分的释放java的能力。就看使用者的认识水平了。

本次选择vertx,主要是其在网络通讯这块,对netty的包装,加上自己的eventloop模型,使得响应web请求速度基本属于前3的水平。

netServer = vertx.createHttpServer(httpServerOptions);
        netServer.requestHandler();
        netServer.requestHandler(hs 
-> {
            
if (hs.path().equals("/ping")) {
                hs.response().end(
"pong");
                
return;
            }
            hs.response().close();
            
return;
        });
        
        netServer.websocketHandler(ws 
-> {
            
if (!ws.path().equals("/" + wsname)) {
                ws.reject();
                
return;
            }
            Player player 
= new Player(ws, ws.binaryHandlerID());
            players.put(player.getConnId(), player);
            player.setServerUUID(gateAdress);
            
//日志
            if (log.isDebugEnabled()) {
                SocketAddress addrLocal 
= ws.localAddress();
                log.debug(
"新建一个连接:连接ID={},  本地端口={}, 远程地址={}", player.getconnId(), addrLocal.port(), ws.remoteAddress());
            }
            
//有连接过来了
            ws.binaryMessageHandler(data -> {
                
int readableBytes = data.length();
                
if (readableBytes < IMessage.MIN_MESSAGE_LENGTH) {
                    
return;
                }
                
int len = data.getShort(0);
                
if (len > 64 * 1024) {
                    log.error(
"conn:" + player.getId() + "  发送数据包过大:" + len);
                    
return;
                }
                
if (readableBytes < len) {
                    
return;
                }

                CGMessage msg 
= decode(data);
                
if (msg == nullreturn;
                inputHandler(msg, player);
            });
            ws.exceptionHandler(e 
-> {
                
if (e.getMessage().equals("WebSocket is closed")) {
//                    player.disconnect();
                }
                
//断连的日志就不打印堆栈了
                if (e.getMessage().contains("Player reset by peer"|| e.getMessage().contains("远程主机强迫关闭了一个现有的连接")) {
                    log.error(
"主动断开:connId={},cause={}", player.getconnId(), e.getCause());
                } 
else {
                    
//输出错误日志
                    log.error("发生异常:connId={},cause={}", player.getconnId(), e.getCause());
                }
            });
            ws.closeHandler(t 
-> {
//                if (player == null) return;
                
//连接状态
                
//日志
                if (log.isDebugEnabled()) {
                    log.debug(
"连接关闭:connId={}, status={}", player.getconnId(), player == null ? "" : player.toString());
                }
                
if (player.getState() == PlayerState.connected || player.getState() == PlayerState.init || player.getState() == PlayerState.logouting) {
                    player.setState(PlayerState.logouted);
                    
//Remove掉 session connId = Player
                    
//删掉连接对应的player
                    players.remove(player.getConnId());
                    
return;
                }
                
if (player.getUserInfo() == null) {
                    
//删掉连接对应的player
                    players.remove(player.getConnId());
                    
return;
                }
                gateService.closePlayer(player.getconnId(), ar 
-> {
                    
if (ar.failed()) {
                        Loggers.coreLogger.error(
"player connId:" + player.getconnId() + " 离线退出异常!!!" + ar.cause().getMessage());
                    }
                    
//删掉连接对应的player
                    players.remove(player.getConnId());
                });

            });
        }).listen(port, host, res 
-> {
            
if (res.succeeded()) {
                
//启动日志信息
                log.info(" started. Listen: " + port + "  vert:" + vertx.hashCode());
                future.complete();
            }
        });
vertx能方便的使用eventloop线程池响应玩家发来的请求,并永远在特定线程进行代码调用。

比自己使用hash线程池靠谱很多。ps. 自己造轮子不是不好,主要实现方法不一定测试完整,有意想不到的情况,就要自己来趟坑。

后面主要是说一下,但如果大规模请求MongoDB,需要更高的MongoDB响应要求。进而想到要加缓存机制,最初想到的是redis+mongodb,自己实现读通过,写通过。
如果redis不存在,则从mongodb读取,并放入缓存,写数据先写缓存,后写mongodb。

自己实现的这种存储机制,比较low。所以继续寻找缓存方案。

过程中,发现了一个曝光率不高的框架,也就是Apache Ignite。最新一代数据网格。

关键的一步,就是如果让vertx与Ignite工作到一起。这是一个必要的条件。

package cn.empires;

import cn.empires.common.Globals;
import cn.empires.common.contants.Loggers;
import cn.empires.gs.support.observer.Event;
import cn.empires.verticle.OnlineVerticle;
import io.vertx.core.DeploymentOptions;
import io.vertx.core.Launcher;
import io.vertx.core.Vertx;
import io.vertx.core.VertxOptions;
import io.vertx.core.json.JsonObject;

public class MainLaunch extends Launcher {

    
private JsonObject config;
    
    
public static void main(String[] args) {
        System.setProperty(
"logFileName""gateServer");
        
new MainLaunch().dispatch(args);
    }
    
    @Override
    
protected String getDefaultCommand() {
        
return super.getDefaultCommand();
    }
 
    @Override
    
protected String getMainVerticle() {
        
return "cn.empires.verticle.GateVerticle";
    }
    
    @Override
    
public void afterConfigParsed(JsonObject config) {
        
super.afterConfigParsed(config);
        
this.config = config;
    }
    
    @Override
    
public void beforeStartingVertx(VertxOptions options) {
        options.setClustered(
true);
    }
    
    @Override
    
public void afterStartingVertx(Vertx vertx) {
        
super.afterStartingVertx(vertx);
        
//config.put("redis.password", "123456");
        
//初始化全局相关信息
        ListenerInit.init(Event.instance);
        Loggers.coreLogger.info(
"Globals init .");
        Globals.init(vertx, config);
        vertx.deployVerticle(OnlineVerticle.
classnew DeploymentOptions().setConfig(config));
    }
    
    @Override
    
public void beforeDeployingVerticle(DeploymentOptions deploymentOptions) {
        
super.beforeDeployingVerticle(deploymentOptions);
    }
    
    @Override
    
public void beforeStoppingVertx(Vertx vertx) {
        
super.beforeStoppingVertx(vertx);
    }
    
    @Override
    
public void afterStoppingVertx() {
        
super.afterStoppingVertx();
    }
    
    @Override
    
public void handleDeployFailed(Vertx vertx, String mainVerticle, DeploymentOptions deploymentOptions, Throwable cause) {
        
super.handleDeployFailed(vertx, mainVerticle, deploymentOptions, cause);
    }
    
}

如果想使用Ignite的缓存,必须需要Ignite实例对象。否则无法获取。
if (ignite == null) {
     ClusterManager clusterManager 
= ((VertxInternal) vertx).getClusterManager();
     String uuid 
= clusterManager.getNodeID();
     ignite 
= Ignition.ignite(UUID.fromString(uuid));
}

在classpath中,配置一个ignite.xml,vertx启动的时候自动会加载ignite.xml,然后使用IgniteManager进行集群管理。
我只贴一遍ignite.xml配置
<?xml version="1.0" encoding="UTF-8"?>

<!--
  Licensed to the Apache Software Foundation (ASF) under one or more
  contributor license agreements.  See the NOTICE file distributed with
  this work for additional information regarding copyright ownership.
  The ASF licenses this file to You under the Apache License, Version 2.0
  (the "License"); you may not use this file except in compliance with
  the License.  You may obtain a copy of the License at

       http://www.apache.org/licenses/LICENSE-2.0

  Unless required by applicable law or agreed to in writing, software
  distributed under the License is distributed on an "AS IS" BASIS,
  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  See the License for the specific language governing permissions and
  limitations under the License.
-->

<!--
    Ignite Spring configuration file to startup Ignite cache.

    This file demonstrates how to configure cache using Spring. Provided cache
    will be created on node startup.

    Use this configuration file when running HTTP REST examples (see 'examples/rest' folder).

    When starting a standalone node, you need to execute the following command:
    {IGNITE_HOME}/bin/ignite.{bat|sh} examples/config/example-cache.xml

    When starting Ignite from Java IDE, pass path to this file to Ignition:
    Ignition.start("examples/config/example-cache.xml");
-->
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation
="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd"
>
    
<bean id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        
<property name="dataStorageConfiguration">
            
<bean class="org.apache.ignite.configuration.DataStorageConfiguration">
                
<!-- Set the page size to 4 KB -->
                  
<property name="pageSize" value="4096"/>
                
<!-- Set concurrency level -->
                  
<property name="concurrencyLevel" value="6"/>
                  
<property name="systemRegionInitialSize" value="#{40 * 1024 * 1024}"/>
                  
<property name="systemRegionMaxSize" value="#{80 * 1024 * 1024}"/>
                  
<property name="defaultDataRegionConfiguration">
                    
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        
<property name="name" value="Default_Region"/>
                        
<!-- 设置默认内存区最大内存为 512M. -->
                        
<property name="maxSize" value="#{512L * 1024 * 1024}"/>
                        
<!-- Enabling RANDOM_LRU eviction for this region.  -->
                            
<property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                    
</bean>
                
</property>
               
<property name="dataRegionConfigurations">
                    
<list>
                      
<!--
                          Defining a data region that will consume up to 500 MB of RAM and 
                          will have eviction and persistence enabled.
                      
-->
                      
<bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                        
<!-- Custom region name. -->
                        
<property name="name" value="500MB_Region"/>
            
                        
<!-- 100 MB initial size. -->
                        
<property name="initialSize" value="#{100L * 1024 * 1024}"/>
            
                        
<!-- 500 MB maximum size. -->
                        
<property name="maxSize" value="#{500L * 1024 * 1024}"/>
                        
                        
<!-- Enabling RANDOM_LRU eviction for this region.  -->
                            
<property name="pageEvictionMode" value="RANDOM_2_LRU"/>
                      
</bean>
                    
</list>
                
</property>
            
</bean>
        
</property>
        
<property name="cacheConfiguration">
            
<list>
                   
<bean class="org.apache.ignite.configuration.CacheConfiguration">
                           
<property name="name" value="UserInfo"/>
                           
<property name="cacheMode" value="PARTITIONED"/>
                           
<property name="atomicityMode" value="ATOMIC"/>
                           
<property name="backups" value="0"/>
                           
<property name="cacheStoreFactory">
                               
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                                   
<constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
                               
</bean>
                           
</property>
                           
<property name="readThrough" value="true"/>
                           
<property name="writeThrough" value="true"/>
                           
<property name="writeBehindEnabled" value="true"/>
                           
<property name="writeBehindFlushSize" value="1024"/>
                           
<property name="writeBehindFlushFrequency" value="5"/>
                           
<property name="writeBehindFlushThreadCount" value="1"/>
                           
<property name="writeBehindBatchSize" value="512"/>
                           
<property name="dataRegionName" value="Default_Region"/>
                
</bean>
            
</list>
        
</property>
        
<property name="failureDetectionTimeout" value="60000"/>
        
<!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        
<property name="discoverySpi">
            
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                
<property name="ipFinder">
                    
<!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
                    
-->
                    
<!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                    
<!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> -->
                        
<property name="addresses">
                            
<list>
                                
<!-- In distributed environment, replace with actual host IP address. -->
                                
<value>127.0.0.1:47500..47509</value>
                            
</list>
                        
</property>
                    
</bean>
                
</property>
            
</bean>
        
</property>
    
</bean>
</beans>

Ignite 对内存有细致的划分,可以分多个区域Region,每个区域有自己的配置,比如设置初始大小和最大大小,以及淘汰策略。
UserInfo对应的CacheConfigurationCache使用进行了配置,比如readThrough writeThrough writeBehindEnabled等等,细致的配置诸如后写刷新频率writeBehindFlushFrequency为5,表示5秒才会刷新一次更新数据。

    public static <T> IgniteCache<String, T> createIgniteCache(String cacheName, Class<? extends CacheStoreAdapter<String, T>> clazz) {
        CacheConfiguration
<String, T> cacheCfg = new CacheConfiguration<>(cacheName);
        
return Globals.ignite().getOrCreateCache(cacheCfg);
    }

在Globals工具类,提供工具方法获得IgniteCache对象。

package cn.empires.gs.player.service.impl;

import org.apache.ignite.IgniteCache;
import org.apache.ignite.lang.IgniteFuture;

import cn.empires.common.Globals;
import cn.empires.common.cache.UserCacheStore;
import cn.empires.common.service.ServiceBase;
import cn.empires.gs.model.UserInfo;
import cn.empires.gs.player.service.UserService;
import io.vertx.core.AsyncResult;
import io.vertx.core.Future;
import io.vertx.core.Handler;
import io.vertx.core.Vertx;
import io.vertx.core.json.JsonObject;

public class UserServiceImpl extends ServiceBase implements UserService {
    
    
private final IgniteCache<String, UserInfo> cache;

    
public UserServiceImpl(Vertx vertx, JsonObject config) {
        
super(vertx, config);
        cache 
= Globals.createIgniteCache(UserInfo.tableName, UserCacheStore.class);
    }

    @Override
    
public UserService getUserInfo(String id, Handler<AsyncResult<UserInfo>> handler) {
        IgniteFuture
<UserInfo> future = cache.getAsync(id);
        future.listen(h 
-> {
            
if(h.isDone()) {
                handler.handle(Future.succeededFuture(h.get()));
            }
        });        
        
return this;
    }
    

    @Override
    
public UserService saveUserInfo(UserInfo userInfo, Handler<AsyncResult<UserInfo>> handler) {
        IgniteFuture
<Void> future = cache.putAsync(userInfo.get_id(), userInfo);
        future.listen(h 
-> {
            
if(h.isDone()) {
                handler.handle(Future.succeededFuture(userInfo));
            }
        });
        
return this;
    }

}

最后一件事,就是同步写库,可以读通过从MongoDB进行读取。

package cn.empires.common.cache;

import java.util.ArrayList;
import java.util.List;

import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;

import org.apache.ignite.IgniteException;
import org.apache.ignite.cache.store.CacheStoreAdapter;
import org.apache.ignite.lifecycle.LifecycleAware;
import org.bson.Document;

import com.mongodb.Block;
import com.mongodb.client.FindIterable;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.model.Filters;
import com.mongodb.client.model.UpdateOptions;

import cn.empires.common.Globals;
import cn.empires.common.contants.Loggers;
import cn.empires.gs.model.UserInfo;
import io.vertx.core.json.JsonObject;

public class UserCacheStore extends CacheStoreAdapter<String, UserInfo> implements LifecycleAware {
    
    
/** Mongo collection. */
    
private MongoCollection<Document> collection;
    
    @Override
    
public void start() throws IgniteException {
    }

    @Override
    
public UserInfo load(String key) throws CacheLoaderException {
        
if(collection == null) {
            collection 
= Globals.mongoDb().getCollection(UserInfo.tableName);
        }
        FindIterable
<Document> iter = collection.find(Filters.eq("_id", key));
        
final List<JsonObject> result = new ArrayList<>(1);
        iter.forEach(
new Block<Document>() {
            
public void apply(Document _doc) {
                result.add(
new JsonObject(_doc.toJson()));
            }
        });
        
if(result != null && !result.isEmpty()) {
            Loggers.userLogger.info(
"CacheStore load UserInfo.");
            JsonObject jsonObj 
= result.get(0);
            
return UserInfo.fromDB(jsonObj);
        }
        
return null;
    }

    @Override
    
public void write(Entry<? extends String, ? extends UserInfo> entry) throws CacheWriterException {
        
if(collection == null) {
            collection 
= Globals.mongoDb().getCollection(UserInfo.tableName);
        }
        Document filter 
= new Document();
        filter.append(
"_id", entry.getKey());
        
        Document replacement 
= new Document();
        replacement.append(
"value", entry.getValue().toString());
        collection.replaceOne(filter, replacement, 
new UpdateOptions().upsert(true));
        Loggers.userLogger.info(
"CacheStore saved UserInfo.");
    }

    @Override
    
public void delete(Object key) throws CacheWriterException {
        
    }



    @Override
    
public void stop() throws IgniteException {
        
    }

}

由于在ignite.xml中进行了配置
<bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
    
<constructor-arg value="cn.empires.common.cache.UserCacheStore"/>
</bean>
,所以在使用Cache获取UserInfo的时候,如果不存在对应的信息,就会从MongoDB读取。

更多的信息只能下一篇文章再写了。有问题可以留言。

posted on 2018-11-13 14:29 北国狼人的BloG 阅读(1553) 评论(0)  编辑  收藏

只有注册用户登录后才能发表评论。


网站导航: