服务器框架(二) 游戏数据的落地

Posted by qijun on May 6, 2021

落地策略

架构:游戏逻辑服务器—Redis—DataSaver—MySQL数据库

游戏数据存储的特点是读少写多,玩家状态数据变化非常频繁,所以每次直接load数据库肯定是不行的。

服务器启动时,从数据库中提取所有数据(私有表除外)进内存,这样load次数减少为1次(用内存换取cpu时间)。逻辑服的脏数据每隔一段时间会向redis做一次修改同步,而存储服务器(DataSaver)不停地读取redis中的数据修改并落地到数据库。

redis起的作用是在服务器宕机时,数据修改不会丢失。(也有人用共享内存文件的方式保证服务器宕机数据不丢失)

对于每次数据改变直接save到数据库也是没有必要的,可以将一段时间内相同字段的修改合并,只执行1次save。

修改合并

数据的变化分为三类:Inserted(插入/新建)、Update(更新)和Deleted(删除)

存在九种合并情况:

插入->插入 or 更新->更新 or 删除->删除:保持原状态,更新字段

插入->更新:保持为插入状态,覆盖更新字段

删除->更新:保持为删除状态,丢弃更新字段(不应出现)

插入->删除 or 更新->删除: 更新为删除状态,清除所有更新字段集合

删除->插入 or 更新->插入:更新为插入状态,记录全字段集合

因为Mysql提供了Replace into语句,插入可以和删除合并:

delete * from table where id = @id; 
insert into table values(...);

-- 上面两条语句等价于
replace into table values(...);

代码实现

public void MergeChange(Row newRow)
{
    SetFlag(newRow._flag);
    if (newRow.Fields != null)
    {
        foreach (KeyValuePair<string, string> f in newRow.Fields)
        {
            SetField(f.Key, f.Value);
        }
    }

    if (newRow.IsDeleted())
    {
        Fields?.Clear();
    }
}

public void SetFlag(RowFlag flag)
{
    if (flag == RowFlag.Deleted)
    {
        if (Fields != null) Fields.Clear();
    }

    if (flag == RowFlag.Inserted || flag == RowFlag.Deleted)
    {
        _flag = flag;
    }
}

程序中总共出现三次合并

流程

datasaver

step.1

Row定义脏数据对象,记录变动类型RowFlag和字段字典Fields

程序修改内存数据时,调用CacheRedisService提供的RowInserted、DataFieldUpdated、StrFieldUpdated和RowDeleted方法生成Row对象。如果_changed字典中已经缓存了对象,将进行第一次合并。

private Row GetRow(int tableIndex, long id,RowFlag flag)
{
    if (!_changed.ContainsKey(tableIndex))
    {
        _changed[tableIndex] = new Dictionary<long, Row>();
    }
    Dictionary<long, Row> table = _changed[tableIndex];
    if (!table.ContainsKey(id))
    {
        table[id] = new Row();
    }
    table[id].SetFlag(flag);
    return table[id];
}
step.2~step.3

100ms间隔的定时器触发SyncChangedToRedis消息,将_changed字典压入字典队列ChangeQueue

step.4~step.5

cacheRedisThread线程将ChangeQueue所有元素出列,调用MergeChange方法做第二次合并。合并结果存储在_changeDict

step.6

主要是DoSyncToRedis方法。对于下线玩家已回写以及user_direct_save内容此处跳过。

先来看写入redis中的数据结构:

redisKey = {CCliModuleBase.ZONE_AREA_ID}logic{CCliModuleBase.SERVER_INDEX} or {CCliModuleBase.ZONE_AREA_ID}_pub

batch = CacheRedisServiceUtil.Now()

rc_{redisKey}_zset (zset): field:batch score:batch

rc_{redisKey}_{batch} (set): tableName

tableHashKey=$”rc_{redisKey}{batch}{tableName}” (Hash): field:rowId value:rowFlag

rowHashKey=$”rc_{redisKey}{batch}{tableName}_{rowId}” (Hash): field:fieldName value:fieldValue

例如,在批次1620288272中,id为7060002的玩家升至了80级:

>>zrange rc_2_logic_0_zset 0 -1 #2服双号服有哪些批次
1620288272

>>smembers rc_2_logic_0_1620288272 #1620288272批次有哪些脏表
user

>>hgetall rc_2_logic_0_1620288272_user #脏表user有哪些脏行
7060002
Normal #脏行的修改类型为更新

>>hgetall rc_2_logic_0_1620288272_user_7060002 #脏行涉及到的字段和值
level
80

将脏数据写入1620288272批次时,进行第三次合并:

if (row.Value.IsDeleted())
{
    batchTasks.Add(redisBatch.KeyDeleteAsync(rowHashKey));
}

RowFlag rowFlag = row.Value.GetRowFlag();
if(rowFlag == RowFlag.Normal)
{
    batchTasks.Add(redisBatch.HashSetAsync(tableHashKey, rowId, rowFlag.ToString(), When.NotExists));
}
else
{
    batchTasks.Add(redisBatch.HashSetAsync(tableHashKey, rowId, rowFlag.ToString(), When.Always));
}

DoSyncToRedis写入失败:

  1. TrySelfHeal检查表名、字段值的空异常
  2. 检测到异常后再次调用DoSyncToRedis
  3. 未检测到异常或者再次调用DoSyncToRedis异常后,DoSaveToFile将要写入redis的数据写入本地文件

写入成功后,_changeDict会得到释放,否则将和新入队ChangeQueue的字典进行合并,一直尝试写入直到成功。

生产环境出现Redis停止服务事故时,应等待Redis重启,之后数据正常写入后本地文件自动删除。 如果此时将游戏进程关闭,必须手动将本地文本内的数据写入redis后方可开启服务器。

catch (Exception e)
{
    _syncToRedisErrorCnt++;
    log.Fatal(String.Format("sync changed to redis error count : {0}, caused by : {1}",_syncToRedisErrorCnt, e));
    
    string fileName = _redisKeyFormatter.GetRedisCmdFileName();
    string bakFile = _redisKeyFormatter.GetRedisCmdFileName(true);
    if(File.Exists(bakFile)) File.Delete(bakFile);
    if(File.Exists(fileName)) File.Move(fileName,bakFile);

    // save to file
    DoSaveToFile();

    int waitInSeconds = _exceptionWaitInSeconds[Math.Min(_syncToRedisBatch, _exceptionWaitInSeconds.Length)-1];
    Thread.Sleep(waitInSeconds*1000);
}

写入本地文件时,一处代码int waitInSeconds = _exceptionWaitInSeconds[Math.Min(_syncToRedisBatch_syncToRedisErrorCnt, _exceptionWaitInSeconds.Length)-1];应是笔误

关于注释的解读

// 重连 redis 时,如果再重连失败时产生了新的 batch,则可能不会执行 zadd,所以这里不再区分是否 newBatch
batchTasks.Add(redisBatch.SortedSetAddAsync(_redisKeyFormatter.GetBatchZSetKey(), _currentBatch.ToString(), _currentBatch));

没有对代码进行考古,猜测原逻辑是有个条件判断:只有当前批次号更新了才会写入Redis。

对应的情况:一次DoSyncToRedis时,产生了新的批次号,newBatch(true),但由于写入数据或批次号时失败,导致新的批次号为写入redis。redis服务速度恢复,此时的DoSyncToRedis判断批次号未更新newBatch(false),_changeDict写入成功,因此这里必须再次写入批次号,否则批次号出现丢失。

step.7~step.10

这部分对应DataSaver中的内容,流程概括为:

  1. TryLock获取回写锁
  2. 开启回写线程WriteBackThreadRun
  3. 获取最老批次,判断不是正在写批次则执行回写SaveBatch

SaveBatch主要是处理redis中的数据,拼成sql语句批量执行。具体过程可阅读代码,这里只讲讲配置参数的意义和回写锁

env_data_saver.cfg

save_thread_number 3 // 回写线程数,单双号服和pub服

skip_check_lock 1 // 检查回写锁

print_sql 1 //是否打印 SQL 日志 1-打印 0-不打印

allowable_error_seconds 10 //默认300s, 回写允许时间误差(含 redis 时间误差和机器时间误差)

sql_insert_batch 100 // 每100条插入语句执行一次Mysql插入

sql_update_batch 100 // 每100条更新语句执行一次Mysql更新

sql_delete_batch 100 // 每100条删除语句执行一次Mysql删除

allowable_error_seconds的作用是,DataSaver从redis中读取出的批次,可能是游戏逻辑服务器正在往里写入的批次号。DataSaver只能处理已经写完的批次。判断是否为正在写入的批次号依据是当前时间 - 批次时间 > 批次产生间隔 + allowable_error_seconds

 double? score = db.SortedSetScore(batchZsetKey, oldestBatchStr);
uint now = CacheRedisServiceUtil.Now();
if (score != null && now - score > period + _allowableErrorSecond)
{
    uint oldestBatch = uint.Parse(oldestBatchStr);
    if (!SaveBatch(db, oldestBatch))
    {
        return false;
    }
}

redis分布式锁

DataSaver开启单线程顺序回写脏数据,为了避免误开启多个进程竞争消费,在启动脚本这一层先控制:将启动的pid写入文件,启动时检查文件中对应的pid是否存在

if [ -f $PID_FILE ];
	then
		old_pid=`cat $PID_FILE`;
		pids=`ps aux | grep dotnet |grep DataSaver | awk '{print $2;}'`;
		for pid in $pids
		do
		        if [ $pid -eq $old_pid ];
		        then
		                echo "DataSaver is running as $pid, please stop it first.";
		                exit 0;
		        fi
		done
	fi

其次,避免多组服务器上配置错误开启多个相同的进程,DataSaver提供了分布式锁:

 private bool TryLock()
{
    IDatabase db = _redis.GetDatabase();
    bool locked = db.StringSet(_lockKey,"1", null,When.NotExists);
    if (locked)
    {
        Thread extendLock = new Thread(ExtendLockThreadRun);
        extendLock.Name = String.Format("Saver Thread {0}", _redisKeyFormatter.GetBatchZSetKey());
        //      extendLock.SetApartmentState(ApartmentState.MTA);
        extendLock.Start();
    }
    ...
}

获得锁后,开启ExtendLockThreadRun线程,每休眠1000ms进行续锁