observer mode

模式介绍:

代码示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
// observer_async_queue.c
// gcc -std=c11 -Wall -Wextra -O2 observer_async_queue.c -o demo && ./demo

#include <stdio.h>
#include <stdint.h>
#include <string.h>

/* =========================
* 1) 事件定义:ID + payload(拷贝在事件里,避免指针悬挂)
* ========================= */

typedef enum {
EVT_A = 0,
EVT_B,
EVT_LOG,
EVT_MAX
} event_id_t;

typedef struct {
event_id_t id;
union {
int value; // EVT_A/EVT_B 用
char msg[32]; // EVT_LOG 用
} data;
} event_t;

/* =========================
* 2) 观察者:回调
* ========================= */
typedef void (*observer_cb_t)(const event_t *e, void *user);

typedef struct {
observer_cb_t cb;
void *user;
} observer_t;

/* =========================
* 3) Subject / EventBus:每类事件一个订阅表(静态数组)
* ========================= */
#define MAX_OBS 8

typedef struct {
observer_t list[EVT_MAX][MAX_OBS];
uint8_t count[EVT_MAX];
} event_bus_t;

static void bus_init(event_bus_t *b)
{
for (int i = 0; i < EVT_MAX; i++) b->count[i] = 0;
}

static int bus_subscribe(event_bus_t *b, event_id_t id, observer_cb_t cb, void *user)
{
if (!b || !cb || id >= EVT_MAX) return -1;
uint8_t n = b->count[id];
if (n >= MAX_OBS) return -2;
b->list[id][n].cb = cb;
b->list[id][n].user = user;
b->count[id] = (uint8_t)(n + 1);
return 0;
}

static void bus_notify(event_bus_t *b, const event_t *e)
{
if (!b || !e || e->id >= EVT_MAX) return;
uint8_t n = b->count[e->id];
for (uint8_t i = 0; i < n; i++) {
observer_t *o = &b->list[e->id][i];
o->cb(e, o->user);
}
}

/* =========================
* 4) 事件队列:环形缓冲区(异步派发核心)
* ========================= */
#define QCAP 16

typedef struct {
event_t q[QCAP];
uint8_t head, tail, size;
uint32_t dropped; // 队列满导致丢事件计数(可观测性)
} event_queue_t;

static void q_init(event_queue_t *q)
{
q->head = q->tail = q->size = 0;
q->dropped = 0;
}

/* publish:只入队,不回调 */
static int q_push(event_queue_t *q, const event_t *e)
{
if (!q || !e) return -1;
if (q->size >= QCAP) {
q->dropped++;
return -2; // full
}
q->q[q->tail] = *e; // 事件整体拷贝,payload 不悬空
q->tail = (uint8_t)((q->tail + 1) % QCAP);
q->size++;
return 0;
}

static int q_pop(event_queue_t *q, event_t *out)
{
if (!q || !out) return -1;
if (q->size == 0) return -2; // empty
*out = q->q[q->head];
q->head = (uint8_t)((q->head + 1) % QCAP);
q->size--;
return 0;
}

/* =========================
* 5) 一个“调度器”:把队列里的事件取出来,通知观察者
* ========================= */
static void dispatch_all(event_bus_t *bus, event_queue_t *q)
{
event_t e;
while (q_pop(q, &e) == 0) {
bus_notify(bus, &e);
}
}

/* =========================
* 6) 观察者示例
* ========================= */
static void obs_print(const event_t *e, void *user)
{
const char *name = (const char*)user;
if (e->id == EVT_LOG) {
printf("[%s] LOG: %s\n", name, e->data.msg);
} else {
printf("[%s] EVT_%d value=%d\n", name, (int)e->id, e->data.value);
}
}

typedef struct { int sum; } acc_t;

static void obs_acc(const event_t *e, void *user)
{
acc_t *a = (acc_t*)user;
if (e->id == EVT_A || e->id == EVT_B) {
a->sum += e->data.value;
printf("[ACC] sum=%d\n", a->sum);
}
}

/* =========================
* 7) “发布者”示例:模拟产生事件(只入队)
* ========================= */
static void publish_A(event_queue_t *q, int v)
{
event_t e = { .id = EVT_A };
e.data.value = v;
(void)q_push(q, &e);
}

static void publish_B(event_queue_t *q, int v)
{
event_t e = { .id = EVT_B };
e.data.value = v;
(void)q_push(q, &e);
}

static void publish_LOG(event_queue_t *q, const char *msg)
{
event_t e = { .id = EVT_LOG };
snprintf(e.data.msg, sizeof(e.data.msg), "%s", msg);
(void)q_push(q, &e);
}

/* =========================
* 8) main:演示异步派发
* ========================= */
int main(void)
{
event_bus_t bus;
event_queue_t q;
bus_init(&bus);
q_init(&q);

acc_t acc = {0};

/* 订阅:注意这时不会立刻收到事件,只有 dispatch 时才会收到 */
bus_subscribe(&bus, EVT_A, obs_print, (void*)"PRINT");
bus_subscribe(&bus, EVT_B, obs_print, (void*)"PRINT");
bus_subscribe(&bus, EVT_LOG,obs_print, (void*)"PRINT");
bus_subscribe(&bus, EVT_A, obs_acc, &acc);
bus_subscribe(&bus, EVT_B, obs_acc, &acc);

printf("=== Async Observer: publish only enqueues ===\n");

/* “生产者”连续发布:此时不会打印,因为还没 dispatch */
publish_A(&q, 10);
publish_B(&q, 3);
publish_LOG(&q, "hello (will print later)");
publish_A(&q, 7);

printf("\n(No output above from observers yet, because not dispatched)\n");

/* 主循环某次 tick:统一派发 */
printf("\n=== dispatch_all() ===\n");
dispatch_all(&bus, &q);

/* 再发布一波,模拟主循环多次 tick */
printf("\n=== publish more, then dispatch again ===\n");
publish_LOG(&q, "tick2 start");
publish_B(&q, 100);
publish_A(&q, -5);

dispatch_all(&bus, &q);

printf("\nQueue dropped=%u (if queue overflow happened)\n", (unsigned)q.dropped);
return 0;
}


一、代码整体功能总结

这段代码实现了一个异步事件总线(EventBus)系统,核心是基于观察者模式 + 环形队列,将「事件发布」和「事件处理」解耦:

  • 发布者只负责将事件放入队列,不直接触发处理;
  • 调度器在合适时机(如主循环 tick)从队列取出事件,统一通知所有订阅该事件的观察者;
  • 全程通过拷贝事件数据避免指针悬挂,同时记录队列溢出的丢包数,保证鲁棒性和可观测性。

二、模块拆解与核心逻辑分析

代码按功能拆分为 8 个模块,逐一解析:

1. 事件定义(核心数据结构)

1
2
3
4
5
6
7
8
typedef enum { EVT_A = 0, EVT_B, EVT_LOG, EVT_MAX } event_id_t;
typedef struct {
event_id_t id;
union {
int value; // EVT_A/EVT_B 用
char msg[32]; // EVT_LOG 用
} data;
} event_t;
  • 设计亮点
    • 用枚举定义事件 ID,清晰区分不同事件类型;
    • union(共用体)存储不同事件的 payload(有效载荷),节省内存;
    • 关键:payload 直接拷贝到 event_t 中,而非存储指针,彻底避免「指针悬挂」(比如原数据释放后事件指针失效)。

2. 观察者定义(订阅者抽象)

1
2
3
4
5
typedef void (*observer_cb_t)(const event_t *e, void *user);
typedef struct {
observer_cb_t cb; // 事件回调函数
void *user; // 观察者自定义数据(避免全局变量)
} observer_t;
  • 观察者的核心是「回调函数 + 用户数据」:
    • observer_cb_t:定义回调函数签名,入参是事件指针和用户数据;
    • observer_t:封装回调和用户数据,让每个观察者可以携带自己的上下文(比如累加器、打印名称)。

3. 事件总线(Subject/EventBus)

1
2
3
4
5
#define MAX_OBS 8
typedef struct {
observer_t list[EVT_MAX][MAX_OBS]; // 每个事件对应一个观察者列表
uint8_t count[EVT_MAX]; // 每个事件的订阅者数量
} event_bus_t;

这是「订阅 - 通知」的核心,负责管理所有事件的订阅者:

  • bus_init:初始化每个事件的订阅者计数为 0;
  • bus_subscribe:为指定事件添加观察者,做了参数校验(非空、事件 ID 合法)和容量检查(最多 8 个观察者);
  • bus_notify:遍历指定事件的所有观察者,调用其回调函数(同步通知的核心,但本代码中仅在异步派发时调用)。

4. 事件队列(异步核心:环形缓冲区)

1
2
3
4
5
6
#define QCAP 16
typedef struct {
event_t q[QCAP];
uint8_t head, tail, size;
uint32_t dropped; // 丢事件计数(可观测性)
} event_queue_t;

环形队列是实现「异步」的关键,将事件发布和处理解耦:

  • q_push:事件入队(发布事件的核心操作),直接拷贝整个event_t,队列满则更新dropped(丢包计数)并返回错误;
  • q_pop:事件出队,拷贝到输出参数,队列为空则返回错误;
  • 环形缓冲区通过head(头指针)、tail(尾指针)、size(当前元素数)实现,避免数组越界。

5. 调度器(事件派发核心)

1
2
3
4
5
6
7
static void dispatch_all(event_bus_t *bus, event_queue_t *q)
{
event_t e;
while (q_pop(q, &e) == 0) { // 循环出队,直到队空
bus_notify(bus, &e); // 通知该事件的所有观察者
}
}
  • 调度器的作用是「消费队列中的事件」:在主循环的某个 tick 调用dispatch_all,统一处理所有待处理事件;
  • 这是「异步」的核心体现:发布事件时只入队,不触发回调,只有调度器执行时才处理。

6. 观察者示例(具体的事件处理逻辑)

  • obs_print:通用打印观察者,根据事件类型打印不同内容,user参数是打印标识(如 “PRINT”);
  • obs_acc:累加观察者,只处理 EVT_A/EVT_B,将事件的 value 累加到acc_t结构体中(user参数是累加器指针)。

7. 发布者示例(事件生产逻辑)

  • publish_A/B/LOG:封装事件创建和入队逻辑,构造对应类型的event_t,调用q_push入队(仅入队,不触发回调)。

8. main 函数(演示流程)

演示了完整的异步事件派发流程:

  1. 初始化事件总线和队列;
  2. 订阅观察者(PRINT 订阅所有事件,ACC 订阅 EVT_A/B);
  3. 发布一批事件(仅入队,无输出);
  4. 调用dispatch_all,处理队列中的事件(此时才会打印输出);
  5. 再次发布事件并 dispatch,验证多次 tick 的处理逻辑;
  6. 打印丢包数(演示可观测性)。

三、代码执行效果(预期输出)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
=== Async Observer: publish only enqueues ===

(No output above from observers yet, because not dispatched)

=== dispatch_all() ===
[PRINT] EVT_0 value=10
[ACC] sum=10
[PRINT] EVT_1 value=3
[ACC] sum=13
[PRINT] LOG: hello (will print later)
[PRINT] EVT_0 value=7
[ACC] sum=20

=== publish more, then dispatch again ===
[PRINT] LOG: tick2 start
[PRINT] EVT_1 value=100
[ACC] sum=120
[PRINT] EVT_0 value=-5
[ACC] sum=115

Queue dropped=0 (if queue overflow happened)