网站翻译建设,外国网页设计网站,vps wordpress域名,甘肃省城乡城乡建设厅网站文章目录 问题背景处理思路注意事项代码实现 问题背景
公司内部多个系统共用一套用户体系库#xff0c;对外(钉钉)我们是两个客户身份(这里是根据系统来的)#xff0c;例如当第三方服务向我们发起用户同步请求#xff1a;是一个更新用户操作#xff0c;它会同时发送一个 d… 文章目录 问题背景处理思路注意事项代码实现 问题背景
公司内部多个系统共用一套用户体系库对外(钉钉)我们是两个客户身份(这里是根据系统来的)例如当第三方服务向我们发起用户同步请求是一个更新用户操作它会同时发送一个 delete 和 insert 请求这两个请求几乎是并发进来的实际上应该是先发起的delete 再 insert 实际情况可能和网络延迟也有关系此时在我们系统中就无法保证这两个请求的顺序执行即先 delete 处理完之后 再进行 insert 的数据处理(正常流程)又或者直接把一定时间内同一个用户的 delete 和 insert 操作合并为一个update操作(本质就是更新操作)。
还有一种情况是第三方系统中添加或者 删除一个用户时会以两个客户的身份去发送两个相同的用户同步请求但同一个用户在我们系统内用户数据只有一份对应的接口肯定也都是同一个即相同的添加接口会在一瞬间被调用两次删除即使执行两次的话也没什么问题问题是添加 即使在添加前判断了用户账号是否存在 并发过来的情况下还是避免不了一些脏数据的产生加锁的话对整体影响又特别大。
处理思路
根据userId账号为每个请求分配一个房间单独的线程如果是第一次进来那么就new一个房间也就是类里边会有一个单独的线程处理这个用户的行为后边一定时间内相同的 userId 进来会找到对应已存在的房间当设置的时间窗口到了之后判断当前userId的同步行为有哪些如果有 insert 和 delete那么直接转为 update 操作。如果是两个insert行为那么最后就只调用一次insert服务如果是两个delete行为那么就只调用一个delete服务。
注意事项
时间窗口的设定如果时间设置过短属于同一个操作的请求因为网络波动 请求到接口的时间会有一定间隔如果你设置的时间间隔小于等待的时间还是会把本就属于同一批次的操作 多次处理
测试过程刚开始时间设置的1500ms也就是当第一个userId进来后等待1.5秒后根据这段时间内收集到的用户行为再去真正的处理后来在测试中发现有些本就属于同一批次的请求还是会被处理多次也就是时间调小了改成2000ms测试还是发现同样的问题。最后采取的是根据最近一个的userId请求的时间 等待1500ms即相同的userId的请求进来后 在当前时间再重新计算等待1500ms时间到了之后没有发现新的用户行为即算是一个批次结束
ps可以创建一个单独的服务来负责对请求进行合理的处理分发处理之后再去调用对应的业务系统服务
代码实现
定义操作行为枚举
public enum OperationEnum {INSERT(insert),DELETE(delete),;private final String value;OperationEnum(String operation) {this.value operation;}public String getValue() {return value;}
}定义每个用户所属的房间房间内存储用户的多个行为insert、delete
public class ActionRoomBean {//用于保存有效事件数据 insert or delete, dataprivate MapString, JSONObject actionDataMap new HashMap();private String userId;//真正负责处理事件的线程private DispatchTask dispatchTask;/*** 定义操作方法 排队接收* param action 请求动作insert 或者 delete* param data 请求参数*/public void addAction(String action, JSONObject data) {//有新请求进来后 计数器 1if(dispatchTask ! null){dispatchTask.getIncrementAndGet();}//如果包含直接跳出if (actionDataMap.containsKey(action)) {return;}actionDataMap.put(action, data);}public ActionRoomBean() {}/*** 有参构造* param userId 用户账号* param actionDataMap 操作类型请求参数*/public ActionRoomBean(String userId, MapString, JSONObject actionDataMap) {this.actionDataMap actionDataMap;this.userId userId;}/*** 创建完这个类的实例后要先调用startManager方法 启动线程*/public void startManager() {dispatchTask new DispatchTask(userId, actionDataMap);new Thread(dispatchTask).start();}
}房间内真正的执行者子线程
public class DispatchTask implements Runnable {//等待的时间窗口private static long sleepTime 1500;//计数器用户有新的行为之后 1用来控制是否继续等待sleepprivate final AtomicInteger count new AtomicInteger(0);//用于保存有效事件数据 insert or delete, data与 ActionRoomBean中的 actionDataMap 指向的是同一个地址MapString, JSONObject actionDataMap;//用户账号String userId;/*** 有参构造*/public DispatchTask(String userId, MapString, JSONObject dataLib) {this.userId userId;this.actionDataMap dataLib;}Overridepublic void run() {try {//线程等待前的数量和休眠后被唤醒的数量做对比如果不相等说明休眠时间内有新的用户行为则进入循环继续sleepint afterCount 0;while (afterCount 0 || afterCount ! count.get()){//每休眠一次 1如果下次循环的值与 1之后的afterCount相等说明时间窗口内没有新的行为则不循环afterCount count.incrementAndGet();Thread.sleep(sleepTime);}} catch (InterruptedException e) {throw new RuntimeException(e);}try {String url RestTemplateUtil.DD_READING_API_URL;JSONObject param null;// 只有添加操作if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) actionDataMap.size() 1) {url /nc/eduUserInsert;param actionDataMap.get(OperationEnum.INSERT.getValue());if (param ! null) {RestTemplateUtil.postForObject(url, param.toJSONString());}} else if (actionDataMap.containsKey(OperationEnum.DELETE.getValue()) actionDataMap.size() 1) {//只有删除操作url /nc/eduUserDelete;param actionDataMap.get(OperationEnum.DELETE.getValue());if (param ! null) {RestTemplateUtil.postForObject(url, param.toJSONString());}} else if (actionDataMap.containsKey(OperationEnum.INSERT.getValue()) actionDataMap.containsKey(OperationEnum.DELETE.getValue())) {//既有添加又有删除就是更新处理url /nc/eduUserUpdate;param actionDataMap.get(OperationEnum.INSERT.getValue());if (param ! null) {RestTemplateUtil.postForObject(url, param.toJSONString());}}} finally {//最后从全局变量中删除userIdDispatchController.closeRoom(userId);}}/*** 计数器 1*/public Integer getIncrementAndGet() {return count.incrementAndGet();}
}控制器
RestController
RequestMapping(value /api/dd)
public class DispatchController {//全局map记录当前有多少个用户正在被处理中private final static MapString, ActionRoomBean allMap new ConcurrentHashMap();//简单的配置的密钥用于接口的身份校验Value(${url.secret})private String secret;/*** insert和 delete 操作都会进入这个接口用 operation 区分当前是什么操作*/PostMapping(value /dispatch)public Result dispatch(RequestBody JSONObject jsonObject,RequestHeader(value secret) String secret){//进行简单的接口身份校验if(!Objects.equals(secret, this.secret)){return Result.generateError(secret eroor);}String userId jsonObject.getString(userId);//operation insert 或者 deleteString operation jsonObject.getString(operation);if(EmptyUtil.isNotEmpty(userId) EmptyUtil.isNotEmpty(operation)){//调用进入房间的方法unboltRoom(userId, operation, jsonObject);}return Result.generateSuccess();}/*** 为每个userId创建一个实例房间* 这里决定了是创建一个新的房间还是进入到已有的房间中*/private void unboltRoom(String userId, String operation, JSONObject jsonObject) {//加锁处理由于真正的执行是在子线程中 所以加锁对整体性能影响也不是很大//主要是避免同一个userId创建了多个实例即使map中key不可重复也会造成请求丢失//例如同一个userId进来insert和delete请求各一个并发不加锁的情况下就有可能创建了两个实例synchronized (this) {ActionRoomBean room allMap.get(userId);//如果全局map中没有说明是这个userId是第一个进来if (room null) {MapString, JSONObject actionMap new HashMap(4);actionMap.put(operation, jsonObject);room new ActionRoomBean(userId, actionMap);//开启计时room.startManager();//放入到全局map中allMap.put(userId, room);}//如果有直接调用 addAction方法room.addAction(operation, jsonObject);}}/*** 当前批次处理完之后从集合中删除用户实例*/public static void closeRoom(String userId) {allMap.remove(userId);}
}整体核心代码就是上边这些以上还可以通过线程池去优化一下。
如果涉及到批量导入同时有大量用户同步数据过来就需要在测试环境进行反复测试 看是否会丢数据因为每个用户都是一个独立的子线程对线程的数量进行优化。