落地策略
架构:游戏逻辑服务器—Redis—DataSaver—MySQL数据库
游戏数据存储的特点是读少写多,玩家状态数据变化非常频繁,所以每次直接load数据库肯定是不行的。
服务器启动时,从数据库中提取所有数据(私有表除外)进内存,这样load次数减少为1次(用内存换取cpu时间)。逻辑服的脏数据每隔一段时间会向redis做一次修改同步,而存储服务器(DataSaver)不停地读取redis中的数据修改并落地到数据库。
redis起的作用是在服务器宕机时,数据修改不会丢失。(也有人用共享内存文件的方式保证服务器宕机数据不丢失)
对于每次数据改变直接save到数据库也是没有必要的,可以将一段时间内相同字段的修改合并,只执行1次save。
修改合并
数据的变化分为三类:Inserted(插入/新建)、Update(更新)和Deleted(删除)
存在九种合并情况:
插入->插入 or 更新->更新 or 删除->删除:保持原状态,更新字段
插入->更新:保持为插入状态,覆盖更新字段
删除->更新:保持为删除状态,丢弃更新字段(不应出现)
插入->删除 or 更新->删除: 更新为删除状态,清除所有更新字段集合
删除->插入 or 更新->插入:更新为插入状态,记录全字段集合
因为Mysql提供了Replace into语句,插入可以和删除合并:
delete * from table where id = @id;
insert into table values(...);
-- 上面两条语句等价于
replace into table values(...);
代码实现
public void MergeChange(Row newRow)
{
SetFlag(newRow._flag);
if (newRow.Fields != null)
{
foreach (KeyValuePair<string, string> f in newRow.Fields)
{
SetField(f.Key, f.Value);
}
}
if (newRow.IsDeleted())
{
Fields?.Clear();
}
}
public void SetFlag(RowFlag flag)
{
if (flag == RowFlag.Deleted)
{
if (Fields != null) Fields.Clear();
}
if (flag == RowFlag.Inserted || flag == RowFlag.Deleted)
{
_flag = flag;
}
}
程序中总共出现三次合并
流程
step.1
Row定义脏数据对象,记录变动类型RowFlag和字段字典Fields
程序修改内存数据时,调用CacheRedisService提供的RowInserted、DataFieldUpdated、StrFieldUpdated和RowDeleted方法生成Row对象。如果_changed字典中已经缓存了对象,将进行第一次合并。
private Row GetRow(int tableIndex, long id,RowFlag flag)
{
if (!_changed.ContainsKey(tableIndex))
{
_changed[tableIndex] = new Dictionary<long, Row>();
}
Dictionary<long, Row> table = _changed[tableIndex];
if (!table.ContainsKey(id))
{
table[id] = new Row();
}
table[id].SetFlag(flag);
return table[id];
}
step.2~step.3
100ms间隔的定时器触发SyncChangedToRedis消息,将_changed字典压入字典队列ChangeQueue
step.4~step.5
cacheRedisThread线程将ChangeQueue所有元素出列,调用MergeChange方法做第二次合并。合并结果存储在_changeDict
step.6
主要是DoSyncToRedis方法。对于下线玩家已回写以及user_direct_save内容此处跳过。
先来看写入redis中的数据结构:
redisKey = {CCliModuleBase.ZONE_AREA_ID}logic{CCliModuleBase.SERVER_INDEX} or {CCliModuleBase.ZONE_AREA_ID}_pub
batch = CacheRedisServiceUtil.Now()
rc_{redisKey}_zset (zset): field:batch score:batch
rc_{redisKey}_{batch} (set): tableName
tableHashKey=$”rc_{redisKey}{batch}{tableName}” (Hash): field:rowId value:rowFlag
rowHashKey=$”rc_{redisKey}{batch}{tableName}_{rowId}” (Hash): field:fieldName value:fieldValue
例如,在批次1620288272中,id为7060002的玩家升至了80级:
>>zrange rc_2_logic_0_zset 0 -1 #2服双号服有哪些批次
1620288272
>>smembers rc_2_logic_0_1620288272 #1620288272批次有哪些脏表
user
>>hgetall rc_2_logic_0_1620288272_user #脏表user有哪些脏行
7060002
Normal #脏行的修改类型为更新
>>hgetall rc_2_logic_0_1620288272_user_7060002 #脏行涉及到的字段和值
level
80
将脏数据写入1620288272批次时,进行第三次合并:
if (row.Value.IsDeleted())
{
batchTasks.Add(redisBatch.KeyDeleteAsync(rowHashKey));
}
RowFlag rowFlag = row.Value.GetRowFlag();
if(rowFlag == RowFlag.Normal)
{
batchTasks.Add(redisBatch.HashSetAsync(tableHashKey, rowId, rowFlag.ToString(), When.NotExists));
}
else
{
batchTasks.Add(redisBatch.HashSetAsync(tableHashKey, rowId, rowFlag.ToString(), When.Always));
}
DoSyncToRedis写入失败:
- TrySelfHeal检查表名、字段值的空异常
- 检测到异常后再次调用DoSyncToRedis
- 未检测到异常或者再次调用DoSyncToRedis异常后,DoSaveToFile将要写入redis的数据写入本地文件
写入成功后,_changeDict会得到释放,否则将和新入队ChangeQueue的字典进行合并,一直尝试写入直到成功。
生产环境出现Redis停止服务事故时,应等待Redis重启,之后数据正常写入后本地文件自动删除。 如果此时将游戏进程关闭,必须手动将本地文本内的数据写入redis后方可开启服务器。
catch (Exception e)
{
_syncToRedisErrorCnt++;
log.Fatal(String.Format("sync changed to redis error count : {0}, caused by : {1}",_syncToRedisErrorCnt, e));
string fileName = _redisKeyFormatter.GetRedisCmdFileName();
string bakFile = _redisKeyFormatter.GetRedisCmdFileName(true);
if(File.Exists(bakFile)) File.Delete(bakFile);
if(File.Exists(fileName)) File.Move(fileName,bakFile);
// save to file
DoSaveToFile();
int waitInSeconds = _exceptionWaitInSeconds[Math.Min(_syncToRedisBatch, _exceptionWaitInSeconds.Length)-1];
Thread.Sleep(waitInSeconds*1000);
}
写入本地文件时,一处代码int waitInSeconds = _exceptionWaitInSeconds[Math.Min(_syncToRedisBatch_syncToRedisErrorCnt, _exceptionWaitInSeconds.Length)-1];应是笔误
关于注释的解读
// 重连 redis 时,如果再重连失败时产生了新的 batch,则可能不会执行 zadd,所以这里不再区分是否 newBatch
batchTasks.Add(redisBatch.SortedSetAddAsync(_redisKeyFormatter.GetBatchZSetKey(), _currentBatch.ToString(), _currentBatch));
没有对代码进行考古,猜测原逻辑是有个条件判断:只有当前批次号更新了才会写入Redis。
对应的情况:一次DoSyncToRedis时,产生了新的批次号,newBatch(true),但由于写入数据或批次号时失败,导致新的批次号为写入redis。redis服务速度恢复,此时的DoSyncToRedis判断批次号未更新newBatch(false),_changeDict写入成功,因此这里必须再次写入批次号,否则批次号出现丢失。
step.7~step.10
这部分对应DataSaver中的内容,流程概括为:
- TryLock获取回写锁
- 开启回写线程WriteBackThreadRun
- 获取最老批次,判断不是正在写批次则执行回写SaveBatch
SaveBatch主要是处理redis中的数据,拼成sql语句批量执行。具体过程可阅读代码,这里只讲讲配置参数的意义和回写锁
env_data_saver.cfg
save_thread_number 3 // 回写线程数,单双号服和pub服
skip_check_lock 1 // 检查回写锁
print_sql 1 //是否打印 SQL 日志 1-打印 0-不打印
allowable_error_seconds 10 //默认300s, 回写允许时间误差(含 redis 时间误差和机器时间误差)
sql_insert_batch 100 // 每100条插入语句执行一次Mysql插入
sql_update_batch 100 // 每100条更新语句执行一次Mysql更新
sql_delete_batch 100 // 每100条删除语句执行一次Mysql删除
allowable_error_seconds的作用是,DataSaver从redis中读取出的批次,可能是游戏逻辑服务器正在往里写入的批次号。DataSaver只能处理已经写完的批次。判断是否为正在写入的批次号依据是当前时间 - 批次时间 > 批次产生间隔 + allowable_error_seconds
double? score = db.SortedSetScore(batchZsetKey, oldestBatchStr);
uint now = CacheRedisServiceUtil.Now();
if (score != null && now - score > period + _allowableErrorSecond)
{
uint oldestBatch = uint.Parse(oldestBatchStr);
if (!SaveBatch(db, oldestBatch))
{
return false;
}
}
redis分布式锁
DataSaver开启单线程顺序回写脏数据,为了避免误开启多个进程竞争消费,在启动脚本这一层先控制:将启动的pid写入文件,启动时检查文件中对应的pid是否存在
if [ -f $PID_FILE ];
then
old_pid=`cat $PID_FILE`;
pids=`ps aux | grep dotnet |grep DataSaver | awk '{print $2;}'`;
for pid in $pids
do
if [ $pid -eq $old_pid ];
then
echo "DataSaver is running as $pid, please stop it first.";
exit 0;
fi
done
fi
其次,避免多组服务器上配置错误开启多个相同的进程,DataSaver提供了分布式锁:
private bool TryLock()
{
IDatabase db = _redis.GetDatabase();
bool locked = db.StringSet(_lockKey,"1", null,When.NotExists);
if (locked)
{
Thread extendLock = new Thread(ExtendLockThreadRun);
extendLock.Name = String.Format("Saver Thread {0}", _redisKeyFormatter.GetBatchZSetKey());
// extendLock.SetApartmentState(ApartmentState.MTA);
extendLock.Start();
}
...
}
获得锁后,开启ExtendLockThreadRun线程,每休眠1000ms进行续锁