免费行情软件网站下载,展台展览,小程序代理商费用,网站生成器apk怎么做skynet源码学习-skynet_mq队列 核心数据结构分析核心接口详解设计优势分析工作流程详解消息传递全流程队列扩容流程 核心数据结构分析
消息结构 (skynet_message)
struct skynet_message {uint32_t source; // 消息来源服务句柄int session; // 会话ID#xff08;用于… skynet源码学习-skynet_mq队列 核心数据结构分析核心接口详解设计优势分析工作流程详解消息传递全流程队列扩容流程 核心数据结构分析
消息结构 (skynet_message)
struct skynet_message {uint32_t source; // 消息来源服务句柄int session; // 会话ID用于RPCvoid * data; // 消息数据指针size_t sz; // 消息大小含类型信息
};消息类型编码通过sz的高8位存储
#define MESSAGE_TYPE_MASK (SIZE_MAX 8)
#define MESSAGE_TYPE_SHIFT ((sizeof(size_t)-1) * 8)服务消息队列 (message_queue)
struct message_queue {struct spinlock lock; // 自旋锁uint32_t handle; // 所属服务句柄int cap; // 队列容量int head; // 队列头指针int tail; // 队列尾指针int release; // 释放标记int in_global; // 是否在全局队列中int overload; // 过载计数int overload_threshold; // 过载阈值动态调整struct skynet_message *queue; // 环形缓冲区struct message_queue *next; // 全局队列链表指针
};全局队列 (global_queue)
struct global_queue {struct message_queue *head; // 队列头struct message_queue *tail; // 队列尾struct spinlock lock; // 自旋锁
};全局单例static struct global_queue *Q NULL;
核心接口详解
void skynet_mq_init()
void
skynet_mq_init() {struct global_queue *q skynet_malloc(sizeof(*q));memset(q,0,sizeof(*q));SPIN_INIT(q);Qq;
}功能初始化全局消息队列系统流程 分配全局队列内存初始化头尾指针为NULL初始化自旋锁 调用时机Skynet启动时
struct message_queue * skynet_mq_create(uint32_t handle)
struct message_queue *
skynet_mq_create(uint32_t handle) {struct message_queue *q skynet_malloc(sizeof(*q));q-handle handle;q-cap DEFAULT_QUEUE_SIZE;q-head 0;q-tail 0;SPIN_INIT(q)// When the queue is create (always between service create and service init) ,// set in_global flag to avoid push it to global queue .// If the service init success, skynet_context_new will call skynet_mq_push to push it to global queue.q-in_global MQ_IN_GLOBAL;q-release 0;q-overload 0;q-overload_threshold MQ_OVERLOAD;q-queue skynet_malloc(sizeof(struct skynet_message) * q-cap);q-next NULL;return q;
}功能创建服务私有消息队列 参数handle - 服务句柄 流程 初始值 容量64in_global1初始状态overload_threshold1024
void skynet_mq_push(struct message_queue *q, struct skynet_message *message)
void
skynet_mq_push(struct message_queue *q, struct skynet_message *message) {assert(message);SPIN_LOCK(q)q-queue[q-tail] *message;if ( q-tail q-cap) {q-tail 0;}if (q-head q-tail) {expand_queue(q);}if (q-in_global 0) {q-in_global MQ_IN_GLOBAL;skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}功能向服务队列推送消息核心逻辑
SPIN_LOCK(q)
1. 消息存入环形缓冲区尾部
2. 如果缓冲区满则扩容expand_queue
3. 若队列不在全局队列中q-in_global MQ_IN_GLOBAL调用 skynet_globalmq_push(q)
SPIN_UNLOCK(q)int skynet_mq_pop(struct message_queue *q, struct skynet_message *message)
int
skynet_mq_pop(struct message_queue *q, struct skynet_message *message) {int ret 1;SPIN_LOCK(q)if (q-head ! q-tail) {*message q-queue[q-head];ret 0;int head q-head;int tail q-tail;int cap q-cap;if (head cap) {q-head head 0;}int length tail - head;if (length 0) {length cap;}while (length q-overload_threshold) {q-overload length;q-overload_threshold * 2;}} else {// reset overload_threshold when queue is emptyq-overload_threshold MQ_OVERLOAD;}if (ret) {q-in_global 0;}SPIN_UNLOCK(q)return ret;
}功能从服务队列取出消息返回值0成功1无消息核心逻辑
SPIN_LOCK(q)
if (队列非空) {1. 复制头部消息到*message2. 移动头指针3. 动态调整过载阈值当队列长度 overload_threshold 时overload 当前长度overload_threshold * 2
} else {重置overload_threshold MQ_OVERLOAD标记q-in_global 0移出全局队列
}
SPIN_UNLOCK(q)void skynet_globalmq_push(struct message_queue *queue)
void
skynet_globalmq_push(struct message_queue * queue) {struct global_queue *q Q;SPIN_LOCK(q)assert(queue-next NULL);if(q-tail) {q-tail-next queue;q-tail queue;} else {q-head q-tail queue;}SPIN_UNLOCK(q)
}功能将服务队列加入全局队列 流程 设计要点全局队列是服务队列的链表
struct message_queue * skynet_globalmq_pop()
struct message_queue *
skynet_globalmq_pop() {struct global_queue *q Q;SPIN_LOCK(q)struct message_queue *mq q-head;if(mq) {q-head mq-next;if(q-head NULL) {assert(mq q-tail);q-tail NULL;}mq-next NULL;}SPIN_UNLOCK(q)return mq;
}功能从全局队列获取一个待处理的服务队列流程
SPIN_LOCK(Q)
1. 从链表头部取出一个服务队列
2. 调整链表头指针
SPIN_UNLOCK(Q)
返回服务队列指针void skynet_mq_mark_release(struct message_queue *q)
void
skynet_mq_mark_release(struct message_queue *q) {SPIN_LOCK(q)assert(q-release 0);q-release 1;if (q-in_global ! MQ_IN_GLOBAL) {skynet_globalmq_push(q);}SPIN_UNLOCK(q)
}功能标记服务队列待释放流程
SPIN_LOCK(q)
1. 设置 q-release 1
2. 若不在全局队列中加入全局队列
SPIN_UNLOCK(q)目的确保待释放队列能被正确处理
void skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud)
void
skynet_mq_release(struct message_queue *q, message_drop drop_func, void *ud) {SPIN_LOCK(q)if (q-release) {SPIN_UNLOCK(q)_drop_queue(q, drop_func, ud);} else {skynet_globalmq_push(q);SPIN_UNLOCK(q)}
}功能释放服务队列资源参数 drop_func消息丢弃回调函数ud用户数据 流程
辅助接口
uint32_t skynet_mq_handle()获取队列所属服务句柄
uint32_t
skynet_mq_handle(struct message_queue *q) {return q-handle;
}int skynet_mq_length()计算当前队列消息数
int
skynet_mq_length(struct message_queue *q) {int head, tail,cap;SPIN_LOCK(q)head q-head;tail q-tail;cap q-cap;SPIN_UNLOCK(q)if (head tail) {return tail - head;}return tail cap - head;
}int skynet_mq_overload()获取并重置过载计数
int
skynet_mq_overload(struct message_queue *q) {if (q-overload) {int overload q-overload;q-overload 0;return overload;} return 0;
}设计优势分析
三级队列结构
优势 全局队列O(1)获取待处理服务服务队列隔离不同服务的消息避免全局锁竞争
环形缓冲区设计
内存布局
[头] 消息1 | 消息2 | ... | 消息N [尾]优势 避免内存碎片动态扩容翻倍策略高效头尾指针操作
智能过载处理
// 动态阈值调整
while (length q-overload_threshold) {q-overload length;q-overload_threshold * 2; // 指数退避
}优势 避免频繁触发过载检测指数退避减少检测开销提供过载监控接口
精细锁控制
锁策略
资源锁类型粒度全局队列自旋锁全局服务队列自旋锁单个服务消息处理无锁-
优势 服务间并行处理最小化临界区
优雅释放机制 标记阶段skynet_mq_mark_release清理阶段skynet_mq_release消息回调通过drop_func处理残留消息
优势 安全释放资源避免消息丢失支持自定义清理逻辑
工作流程详解
消息传递全流程 队列扩容流程