基础知识
1.src/msg
1.1 相关的类
1.1 .1 message
1.1.2 connection
1.1.3 messenger
1.1.4 dispatcher
1.1.5 Accepter
1.1.6 Pipe
1.1.7 DispatchQueue
1.2 详细过程
实例
monitor 例子
网络层处理-基础知识
引用:https://blog.csdn.net/zhq5515/article/details/49814941
基础知识
1.src/msg
msg它是客户端与服务器之间通信的底层模块,用于在客户端与服务器之间发送和接收请求。
在src/msg 目录:首先定义网络通信框架,完成通信接口与具体实现的分离,子目录Simple,Async,XIO有三种不同的实现方法。
simple:每个网络连接将创建两个线程,一个负责接收,另一个负责发送。
Async模式:基于事件的使用I/O多路复用模式。它在网络通信中被广泛使用,但在网络通信中却被广泛使用ceph还在实验阶段?
XIO:使用开源网络通信库accelio实现。实验阶段?
1.1 相关的类
1.1 .1 message
类Message: src/msg/Message.h Message.cc
它是所有消息的基本类别,任何要发送的消息都应该继承。数据格式如图所示:
| header | user_data | footer |
class Message : public RefCountedObject { protected: ceph_msg_header header; // headerelope ceph_msg_footer footer; bufferlist payload; // "front" unaligned blob bufferlist middle; // "middle" unaligned blob bufferlist data; // data payload (page-alignment will be preserved where possible) ... };
消息内容可分为三部分
- header
- user data
- footer。
user data 可分为三部分
- payload
- middle
- data
ceph_msg_header 定义新闻传输相关元数据的消息头;
ceph_msg_footer在消息的尾部,添加了一些crc验证数据和消息结束标记。
信息带的数据user data分别保存在payload,middle,data这三个bufferlist中。
payload一般保存ceph操作相关元数据;
middle目前尚未使用;
data一般是读写数据。
1.1.2 connection
src/msg/connection.h
是端(port)对端的socket的链接的封装。其最重要的接口是可以发送消息。
1.1.3 messenger
是整个网络抽象模块(采用架构 Publish/subscribe(发布/订阅) 网络模块的基本设计模式)是定义的api接口。
这类作为消息, 各个 子类作为新闻。Messenger 收到消息后,通过 Pipe 读取消息,然后转给 Dispatcher 处理
SimpleMessenger Messenger 接口的实现
1.1.4 dispatcher
src/msg/dispatcher.h
dispatcher是新闻分发界面。
注册dispatcher类别用于接收message请求分发给具体处理的应用层;client一个需要实现dispatcher处理接收到的函数的函数ACK应对消息。
类别是订阅者的基本类别,具体的订阅后端(Monitor, osd等)继承这一类,Monitor, osd等都是继承自己dispatcher。
1.1.5 Accepter
监听 peer 的请求, 当有新的请求时, 调用 SimpleMessenger::add_accept_pipe() 创建新的 Pipe 到 SimpleMessenger::pipes 处理此请求
1.1.6 Pipe
主要有两个组件用于读取和发送消息,Pipe::Reader 和 Pipe::Writer, 分别用于处理 读取和发送消息. 这两类都是 class Thread 子类意味着每次处理消息都会创建两个线程.
消息被 Pipe::Reader 读取后,将通知线程 注册到 Messenger::dispatchers 中的某一个 Dispatcher(如 Monitor) 处理, 处理完成后,将回复信息放入 SimpleMessenger::Pipe::out_q 中,供 Pipe::Writer 来处理发送
1.1.7 DispatchQueue
用于缓存收到的消息, 然后唤醒 DispatchQueue::dispatch_thread 找到后端的线程 Dispatch 处理消息。
1.2 详细过程
数据流/流程分析:https://blog.51cto.com/wendashuai/2497104
实例
网络模块的使用
https://blog.csdn.net/changtao381/article/details/50915328
monitor 例子
int main(int argc, char *argv[])
{
// 创建一个 Messenger 对象,由于 Messenger 是抽象类,不能直接实例化,提供了一个
// ::create 的方法来创建子类,目前 Ceph 所有模块使用 SimpleMessenger
Messenger *messenger = Messenger::create(g_ceph_context,
entity_name_t::MON(rank),
"mon",
0);
/**
* 执行 socket() -> bind() -> listen() 等一系列动作, 执行流程如下:
SimpleMessenger::bind()
--> Accepter::bind()
socket() -> bind() -> listen()
*/
err = messenger->bind(ipaddr);
// 创建一个 Dispatch 的子类对象, 这里是 Monitor
mon = new Monitor(g_ceph_context, g_conf->name.get_id(), store,
messenger, &monmap);
// 启动 Reaper 线程 (reaper 收割机-收割消息?)
messenger->start();
/**
* a). 初始化 Monitor 模块
* b). 通过 SimpleMessenger::add_dispatcher_tail() 注册自己到
* SimpleMessenger::dispatchers 中, 流程如下:
* Messenger::add_dispatcher_tail()
* --> ready()
* --> dispatch_queue.start()(新 DispatchQueue 线程)
--> Accepter::start()(启动start线程)
* --> accept?? accepter::entry
* --> SimpleMessenger::add_accept_pipe
* --> Pipe::start_reader 启动reader_thread,reader_thread调用reader()函数
* --> Pipe::reader()
* 在 ready() 中: 通过 Messenger::reader(),
* 1) DispatchQueue 线程会被启动,用于缓存收到的消息消息
* 2) Accepter 线程启动,开始监听新的连接请求.
*/
mon->init();
// 进入 mainloop, 等待退出
messenger->wait();
return 0;
}
收到连接请求
请求的监听和处理由 SimpleMessenger::ready –> Accepter::entry 实现
void SimpleMessenger::ready()
{
// 启动 DispatchQueue 线程
dispatch_queue.start();
lock.Lock();
// 启动 Accepter 线程监听客户端连接, 见下面的 Accepter::entry
if (did_bind)
accepter.start();
lock.Unlock();
}
void *Accepter::entry()
{
struct pollfd pfd;
// listen_sd 是 Accepter::bind()中创建绑定的 socket
pfd.fd = listen_sd;
pfd.events = POLLIN | POLLERR | POLLNVAL | POLLHUP;
while (!done)
{
int r = poll(&pfd, 1, -1);
if (pfd.revents & (POLLERR | POLLNVAL | POLLHUP))
break;
if (done) break;
entity_addr_t addr;
socklen_t slen = sizeof(addr.ss_addr());
int sd = ::accept(listen_sd, (sockaddr *)&addr.ss_addr(), &slen);
if (sd >= 0)
{
// 调用 SimpleMessenger::add_accept_pipe() 处理这个连接
msgr->add_accept_pipe(sd);
}
}
return 0;
}
随后创建pipe()开始消息的处理
Pipe *SimpleMessenger::add_accept_pipe(int sd)
{
lock.Lock();
Pipe *p = new Pipe(this, Pipe::STATE_ACCEPTING, NULL);
p->sd = sd;
p->pipe_lock.Lock();
//
/**
* 调用 Pipe::start_reader() 开始读取消息, 将会创建一个读线程开始处理.
* Pipe::start_reader() --> Pipe::reader
*/
p->start_reader();
p->pipe_lock.Unlock();
pipes.insert(p);
accepting_pipes.insert(p);
lock.Unlock();
return p;
}
处理消息由 Pipe::start_reader() –> Pipe::reader() 开始,此时已经是在 Reader 线程中. 首先会调用 accept() 做一些简答的处理然后创建 Writer() 线程,等待发送回复 消息. 然后读取消息, 读取完成之后, 将收到的消息封装在 Message 中,交由 dispatch_queue() 处理.
dispatch_queue() 找到注册者,将消息转交给它处理,处理完成唤醒 Writer() 线程发送回复消息.
void Pipe::reader()
{
/**
* Pipe::accept() 会调用 Pipe::start_writer() 创建 writer 线程, 进入 writer 线程
* 后,会 cond.Wait() 等待被激活,激活的流程看下面的说明. Writer 线程的创建见后后面
* Pipe::accept() 的分析
*/
if (state == STATE_ACCEPTING)
{
accept();
}
while (state != STATE_CLOSED &&
state != STATE_CONNECTING)
{
// 读取消息类型,某些消息会马上激活 writer 线程先处理
if (tcp_read((char *)&tag, 1) < 0)
{
continue;
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE)
{
continue;
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE2)
{
continue;
}
if (tag == CEPH_MSGR_TAG_KEEPALIVE2_ACK)
{
continue;
}
if (tag == CEPH_MSGR_TAG_ACK)
{
continue;
}
else if (tag == CEPH_MSGR_TAG_MSG)
{
// 收到 MSG 消息
Message *m = 0;
// 将消息读取到 new 到的 Message 对象
int r = read_message(&m, auth_handler.get());
// 先激活 writer 线程 ACK 这个消息
cond.Signal(); // wake up writer, to ack this
// 如果该次请求是可以延迟处理的请求,将 msg 放到 Pipe::DelayedDelivery::delay_queue,
// 后面通过相关模块再处理
// 注意,一般来讲收到的消息分为三类:
// 1. 直接可以在 reader 线程中处理,如上面的 CEPH_MSGR_TAG_ACK
// 2. 正常处理, 需要将消息放入 DispatchQueue 中,由后端注册的消息处理,然后唤醒发送线程发送
// 3. 延迟发送, 下面的这种消息, 由定时时间决定什么时候发送
if (delay_thread)
{
utime_t release;
if (rand() % 10000 < msgr->cct->_conf->ms_inject_delay_probability * 10000.0)
{
release = m->get_recv_stamp();
release += msgr->cct->_conf->ms_inject_delay_max * (double)(rand() % 10000) / 10000.0;
lsubdout(msgr->cct, ms, 1) << "queue_received will delay until " << release << " on " << m << " " << *m << dendl;
}
delay_thread->queue(release, m);
}
else
{
// 正常处理的消息,放到 Pipe::DispatchQueue *in_q 中, 以下是整个消息的流程
// DispatchQueue::enqueue()
// --> mqueue.enqueue() -> cond.Signal()(激活唤醒 DispatchQueue::dispatch_thread 线程)
// --> DispatchQueue::dispatch_thread::entry() 该线程得到唤醒
// --> Messenger::ms_deliver_XXX
// --> 具体的 Dispatch 实例, 如 Monitor::ms_dispatch()
// --> Messenger::send_message()
// --> SimpleMessenger::submit_message()
// --> Pipe::_send()
// --> Pipe::out_q[].push_back(m) -> cond.Signal 激活 writer 线程
// --> ::sendmsg()//发送到 socket
in_q->enqueue(m, m->get_priority(), conn_id);
}
}
else if (tag == CEPH_MSGR_TAG_CLOSE)
{
cond.Signal();
break;
}
else
{
ldout(msgr->cct, 0) << "reader bad tag " << (int)tag << dendl;
pipe_lock.Lock();
fault(true);
}
}
}
Pipe::accept() 做一些简单的协议检查和认证处理,之后创建 Writer() 线程: Pipe::start_writer() –> Pipe::Writer
int Pipe::accept()
{
ldout(msgr->cct, 10) << "accept" << dendl;
// 检查自己和对方的协议版本等信息是否一致等操作
// ......
while (1)
{
// 协议检查等操作
// ......
/**
* 通知注册者有新的 accept 请求过来,如果 Dispatcher 的子类有实现
* Dispatcher::ms_handle_accept(),则会调用该方法处理
*/
msgr->dispatch_queue.queue_accept(connection_state.get());
// 发送 reply 和认证相关的消息
// ......
if (state != STATE_CLOSED)
{
/**
* 前面的协议检查,认证等都完成之后,开始创建 Writer() 线程等待注册者
* 处理完消息之后发送
*
*/
start_writer();
}
ldout(msgr->cct, 20) << "accept done" << dendl;
/**
* 如果该消息是延迟发送的消息, 且相关的发送线程没有启动,启动之
* Pipe::maybe_start_delay_thread()
* --> Pipe::DelayedDelivery::entry()
*/
maybe_start_delay_thread();
return 0; // success.
}
}
随后 Writer 线程等待被唤醒发送回复消息
void Pipe::writer()
{
while (state != STATE_CLOSED) // && state != STATE_WAIT) {
{
if (state != STATE_CONNECTING && state != STATE_WAIT && state != STATE_STANDBY &&
(is_queued() || in_seq > in_seq_acked))
{
// 对 keepalive, keepalive2, ack 包的处理
// ......
// 从 Pipe::out_q 中得到一个取出包准备发送
Message *m = _get_next_outgoing();
if (m)
{
// 对包进行一些加密处理
m->encode(features, !msgr->cct->_conf->ms_nocrc);
// 包头
ceph_msg_header &header = m->get_header();
ceph_msg_footer &footer = m->get_footer();
// 取出要发送的二进制数据
bufferlist blist = m->get_payload();
blist.append(m->get_middle());
blist.append(m->get_data());
// 发送包: Pipe::write_message() --> Pipe::do_sendmsg --> ::sendmsg()
ldout(msgr->cct, 20) << "writer sending " << m->get_seq() << " " << m << dendl;
int rc = write_message(header, footer, blist);
m->put();
}
continue;
}
// 等待被 Reader 或者 Dispatcher 唤醒
ldout(msgr->cct, 20) << "writer sleeping" << dendl;
cond.Wait(pipe_lock);
}
}
消息的处理
Reader 线程将消息交给 dispatch_queue 处理,流程如下:
Pipe::reader() –> Pipe::in_q->enqueue()
void DispatchQueue::enqueue(Message *m, int priority, uint64_t id)
{
Mutex::Locker l(lock);
ldout(cct, 20) << "queue " << m << " prio " << priority << dendl;
add_arrival(m);
// 将消息按优先级放入 DispatchQueue::mqueue 中
if (priority >= CEPH_MSG_PRIO_LOW)
{
mqueue.enqueue_strict(
id, priority, QueueItem(m));
}
else
{
mqueue.enqueue(
id, priority, m->get_cost(), QueueItem(m));
}
// 唤醒 DispatchQueue::entry() 处理消息
cond.Signal();
}
void DispatchQueue::entry()
{
while (true)
{
while (!mqueue.empty())
{
QueueItem qitem = mqueue.dequeue();
Message *m = qitem.get_message();
/**
* 交给 Messenger::ms_deliver_dispatch() 处理,后者会找到
* Monitor/OSD 等的 ms_deliver_dispatch() 开始对消息的逻辑处理
* Messenger::ms_deliver_dispatch()
* --> Monitor::ms_dispatch()
*/
msgr->ms_deliver_dispatch(m);
}
if (stop)
break;
// 等待被 DispatchQueue::enqueue() 唤醒
cond.Wait(lock);
}
lock.Unlock();
}
下面简单看一下在订阅者的模块中消息是怎样被放入 Pipe::out_q 中的:
Messenger::ms_deliver_dispatch()
--> Monitor::ms_dispatch()
--> Monitor::_ms_dispatch
--> Monitor::dispatch
--> Monitor::handle_mon_get_map
--> Monitor::send_latest_monmap
--> SimpleMessenger::send_message()
--> SimpleMessenger::_send_message()
--> SimpleMessenger::submit_message()
--> Pipe::_send()
bool Monitor::_ms_dispatch(Message *m)
{
ret = dispatch(s, m, src_is_mon);
if (s)
{
s->put();
}
return ret;
}
bool Monitor::dispatch(MonSession *s, Message *m, const bool src_is_mon)
{
switch (m->get_type())
{
case CEPH_MSG_MON_GET_MAP:
handle_mon_get_map(static_cast<MMonGetMap *>(m));
break;
// ......
default:
ret = false;
}
return ret;
}
void Monitor::handle_mon_get_map(MMonGetMap *m)
{
send_latest_monmap(m->get_connection().get());
m->put();
}
void Monitor::send_latest_monmap(Connection *con)
{
bufferlist bl;
monmap->encode(bl, con->get_features());
/**
* SimpleMessenger::send_message()
* --> SimpleMessenger::_send_message()
* --> SimpleMessenger::submit_message()
* --> Pipe::_send()
*/
messenger->send_message(new MMonMap(bl), con);
}
void Pipe::_send(Message *m)
{
assert(pipe_lock.is_locked());
out_q[m->get_priority()].push_back(m);
// 唤醒 Writer 线程
cond.Signal();
}
由上面的所有分析,除了订阅者/发布者设计模式,对网络包的处理上采用的是古老的 生产者消费者问题 线程模型,每次新的请求就会有创建一对收/发线程用来处理消息的接受 发送,如果有大规模的请求,线程的上下文切换会带来大量的开销,性能可能产生瓶颈。
https://www.cnblogs.com/yi-mu-xi/p/10144362.html
《网络层的处理》--基础知识
在现在的网络编程实现中,大多数项目都会采用基于事件通知的异步网络 IO 方式来实现。目前无论是 Epoll 还是 Kqueue 都已经成为主流网络编程知识,本文就不介绍这些基本概念和使用了。主要围绕目前 Ceph 的网络层实现来解构和探讨如何重构。
总体设计
在 Ceph 项目伊始的 06 年,那时候在网络编程还是线程模型当道的年头,Ceph 采用了简单粗暴的采用了每两个线程对应一个终端的方式,其中一个用于监听和读取该终端的读事件,另一个用于写事件。我们可以简单称这两个线程为一条连接的读线程和写线程。读线程得到请求以后会解析网络流并开始构建消息,然后派发到后面的 Dispatcher。写线程在大部分时候会处于 Sleep 状态,直到有新的消息需要发送才会被唤醒。
Ceph 在目前的网络层面上有三个重要概念,分别是 Messenger,Pipe,Connection。比如每个 OSD 中会有 cluster_messenger 和 public_messenger,顾名思义
因此 cluster_messenger 中负责的连接会全部是面向其他 OSD 或者 Monitor 的连接。,为了解决网络连接不稳定或者临时闪断连接的问题,Pipe 会一直维护面向一个终端地址的会话状态,如类似 TCP 包序号的消息序号和发送队列。Connection 就是一个 socket 的 wrapper,它从属于某一个 Pipe。
一个会话的逻辑
上图主要聚焦 OSD 端的网络逻辑。 OSD 端实际上是一模一样的网络实现,。网络上层的业务实际上只需要关注一个逻辑上的持久会话,通过 Dispatcher 得到消息处理,然后通过 Connection 接口把消息放到发送队列发送。
上面这个图主要描述了一个会话建立的流程,其中 banner 类似于一个宣告,然后互相了解对方的地址信息。主要逻辑在于 connection message 中的信息,服务器端会校验这些连接信息并确保面向这个地址的连接只有一条,如果 client 发送了一个已经建立会话的地址,服务器端会考虑是否需要替换或者废弃当前连接。另外在这里会需要确定是否要发送验证信息(图中省略)。在会话成功后,实际上就对双方当前的 Pipe 状态达成了一致。
线程问题
,也就是随着一个实体(如 OSD)的增加会线性增加一个实体的线程数目。线程的增加会导致严重的线程切换(Context Switch) 损耗,线程级的 Context Switch 大概在 us 级别,会影响延迟敏感性应用的性能并且对系统造成资源压力。
因此,在去年 Accelio 提交了基于开源的一个 RPC 库的高性能的 Ceph 网络层实现,另外,作者也在上个月完成了一个基于事件通知的异步 Messenger 实现,主要将之前同步的 SimpleMessenger 改造成状态机并且实现了一个简单的事件管理器。相对于 accelio 的实现,本人的实现希望避免引入一个巨大的 RPC 库造成诸多不便,会大大限制 Ceph 在网络层实现上的范围,也可能会引来第三方库使用的问题。而实际上 Ceph 只需要一个简单的事件管理器即可高效的达到目的。
作者:620T 链接:https://www.jianshu.com/p/a7ed0c75ba76