Contents
  1. 1. 问题
  2. 2. 协议
  3. 3. 网络层
  4. 4. Sender和Receiver
  5. 5. 逻辑层
  6. 6. 总结

问题

  1. 程序如何“交流”?
  2. 怎样更有效率的“沟通”?
  3. 代码怎样写的舒服?

协议

关于第一点,因为现在异地IPC用socket比较多,所以就拿这个来说。
我们知道socket本质上是全双工的数据流,为了使对方能够从杂乱无章的数据中“分拣”出正确的信息来,我们就需要给通信双方明确一份“协议”。
“协议”是需要对数据的构成规则进行描述,A利用“协议”生存数据,B用同一份“协议”还原数据。即这种形式:

发送端:用户数据 -> 协议 -> 待发送的无格式数据 -> 网络层

接收端:网络层 -> 收到的无格式数据 -> 协议 -> 用户数据

一次网络通信需要好多种协议参与:应用层的用户消息协议、网络层的网络协议(TCP、UDP)、链路层协议、路由上的协议等等..

网络层及以下的协议主要为了保障用户层数据正确发送和接收,这个先不管,专门人负责专门事,开发的时候我们专注解决应用层的收发问题。
我理解的应用层协议分为两个部分,一个部分是用户消息的消息头(ID)和数据结构需要用户明确给出,还有一个部分是隐性存在的——消息的基本格式——这部分是为辅助对端恢复数据格式而设,通常包括整条消息的长度数据及其字节数、消息头字节数及它们的排列顺序等,这部分不需要用户显示在代码中给出,但收发端都要基于这个隐性要求通信。

所以从应用层角度,一条完整消息常见格式是这样:

消息长度+消息头+数据结构

关于协议应用举个栗子(随手写的味精测试):
发送端:

const uint32 _BUF_LEN = 4096;
// 我们的数据包括:
uint32 pkg_len;
uint32 msg_id;
MsgStruct msg;

// 待发数据缓冲区
char outbuf[_BUF_LEN+1];

// 按照隐性协议,我们需要先把包长度写进去
pkg_len = sizeof(pkg_len) + sizeof(msg_id) + sizeof(msg);
sprintf(buf, "%d", pkg_len);
// 再把消息id写进去
sprintf(buf+sizeof(pkg_len), "%d", msgid);
// 然后写内容
sprintf(buf+sizeof(pkg_len)+sizeof(msg_id), "%d%d%d", msg.a, msg.b, msg.c);
// 发送
send(sockfd, outbuf, pkg_len);

接收端:

const uint32 _BUF_LEN = 4096;

uint32 pkg_len;
uint32 msg_id;

//接收缓冲区
static char inbuf[_BUF_LEN+1];
// 当前缓冲未处理的数据长度
static int32 len = 0;
// 根据隐性协议先读包长度
assert(len <= _BUF_LEN);
int32 tmpsize = read(sockfd, inbuf+len, _BUF_LEN-len);
if (tmpsize <= 0)
{
	// 断线处理
	return;
}
// 调整缓冲区游标
len += tmpsize;
if (len < sizeof(pkg_len))
	return;	// 收到的数据连包长都不够读,果断先不处理
// 读包长度
sscanf(buf, "%d", &pkg_len);
if (len < pkg_len)
	return;	// 一条消息的数据不完整,再等等..
// 读消息id
sscanf(buf+sizeof(pkg_len), "%d", &msgid);
// 消息细加工,根据各个消息各自数据结构解析数据
OnRemoteMsg(msgid, buf+sizeof(pkg_len)+sizeof(msg_id));
// 调整缓冲区游标
len -= pkg_len;
assert(len >= 0);

这个栗子可以看出…就算不考虑网络层的问题,单处理协议也是比较麻烦的,而且要用这种方法处理变长字符串也很麻烦。更别说消息的压缩和处理效率等等的问题。

再把网络层的问题考虑进来,需要考虑各种错误处理、缓冲区满的情况(这里没处理sendbuf满导致的问题)。而这离我们期望的傻瓜化的应用编程还差的很远:

// 发送
MsgDef msg;
SendMsg(msg);

// 接收
void OnRemoteMsg(const MsgDef& msg)
{
	// TODO.
}

网络层

所以为了逻辑层接口变得尽可能的清晰简单(不需要考虑任何通信细节),我们就要运用各种办法进行逐层封装。

首先是网络层,网络层目前比较清晰的封装是用Reactor模式,对各种网络事件注册回调函数,然后每个连接需要对外提供send和onrecv方法,所有可能遇到的网络层问题都在网络层处理。

一般网络库基本也都是类似的形式,但外部进行使用的时候最好再做一层接口,隔离网络层的实现,方便日后改用其它网络库。

比如说,我们先用对网络库进行c++ 、onconnected等

然后在逻辑层就CReactor* reactor = new CLEReactor; 以后就以CReactor接口进行操作。
对于网络事件的处理通常也需要通知逻辑层进行处理,这样我们也只需要在CReactor中添加一个IEventHandler成员和set方法,并且再每次事件发生时调用它。然后逻辑层继承IEventHandler写好逻辑好set给reactor即可。

而网络连接,也会涉及到具体库的编程,所以我们可以定义成CLEConnection类,并让它继承CConnection(提供Send、OnRecv等方法)。让IEventHandler只“知道”CConnection,然后逻辑层就屏蔽了对底层网络库的依赖。

因为网络层只提供基本的send和onrecv接口,但这个接口和逻辑层的应用还有差距,所以我们考虑在send和onrecv的地方再加一层封装。

大致结构比如,实现一个CMsgSender类和CMsgReceiver类,分别用来提供SendMsg(const MsgType& msg)接口和OnRemoteMsg(const MsgType& msg)接口,把具体的收发逻辑封装在这里,然后由逻辑层的Connection对象来继承。

这样我们在逻辑层需要用到网络功能的时候就非常简便了。因为CMsgSender和CMsgReceiver可能采用第三方库来实现,所以也可以用类似上面的思路对底层库进行隔离。

Sender和Receiver

下面就来讨论CMsgSender和CMsgReceiver的实现问题:

CMsgSender:

首先考虑组织数据的问题,即如何进行MsgType => bytedata的转换。

首先是包长,这个好办,pkg_len = sizeof(pkg_len) + sizeof(msg_id) + sizeof(msg)

但是msg_id还不知道,我们既不希望每条网络消息的数据结构都额外带一条数据叫msgid,也不希望每次调用SendMsg接口的时候都额外传一个叫做msgid的参数,当然也不希望创建一个msg_2_id的map来查表,更不希望对每个msgtype重载一个函数…

通过一个数据得到另外一个数据无非就是这个数据本身包含它,要不就是查表,别无他法。所以排除前者,建立msg到id的映射关系肯定是必要的。

但我们建立映射的时候不能从obj到id,只能是type到id,但是这种编译期数据->运行期数据的映射是不好做的。

重点就在把id也弄成编译期数据…

为了能漂亮的从msg中弄到msgid,我们需要借助一些“编译期工具”。

我们定义消息id的时候一般也顺便定义这条消息的数据结构,很自然的想到在这个地方建立映射关系:

namespace PtlMapper
{
	template <uint32 msgid> struct GetTypeById { };
	template <typename MsgType> struct GetIdByType { };
}

#define CREATE_MAPPER(msgid, msgtype) \
namespace PtlMapper { \
	template < > struct GetTypeById<msgid> { typedef msgtype res; }; \
	template < > struct GetIdByType<msgtype> { enum { res = msgid }; }; \
}

这样我们就可以通过 CREATE_MAPPER 这个宏对映射关系进行创建,然后通过PtlMapper::GetIdByType::res获得这个消息类对应的msgid。
而通过obj获得它的type相对容易很多的,我们只需要把SendMsg做成模板函数,像这样:

template <typename MsgType> void SendMsg(const MsgType& msg);

我们就可以通过MsgType来访问msg的具体类型,最终我们这样拿到id。

template <typename MsgType>
void CMsgSender::SendMsg(const MsgType& msg)
{
	uint32 msgid = PtlMapper::GetIdByType<MsgType>::res;
	//...

然后对于数据的转换,我们可以通过protobuf来做:

uint32 msgid = PtlMapper::GetIdByType<PBMsgType>::res;
uint32 msglen = msg.ByteSize();
uint32 prefixlen = sizeof(msgid) + sizeof(msglen);
uint32 bufferlen = prefixlen + msglen;

static uint8 buf[_BUF_SIZE];
google::protobuf::io::ArrayOutputStream array_output(buf, bufferlen);
google::protobuf::io::CodedOutputStream coded_output(&array_output);

coded_output.WriteLittleEndian32(bufferlen);
coded_output.WriteLittleEndian32(msgid);

msg.SerializeToCodedStream(&coded_output);

再就是发送,由于采用继承的方式来做,所以实际上this对象本身就是connection对象,理应具有send功能。
所以一种做法就是,把this转型为子类指针,然后调用子类的send方法。因为这种方法需要Sender知道子类的类型,所以为了提供类型,只能把Sender类本身进行模板化,把子类的类型放到模板参数里去。

就像这样:

template <typename HolderType>
template <typename PBMsgType>
void TPBMsgSender<HolderType>::SendMsg(const PBMsgType& msg)
{
	auto holder = static_cast<HolderType*>(this);
	//...
	holder->Send((const char*)buf, bufferlen);
}

或者,也可以在Sender类中加一个纯虚函数,比如叫RawSend,由子类去实现。然后直接调RawSend方法发送即可。

CMsgReceiver:

Receiver的实现和Sender有些区别,一点区别就是send是主动调,onrecv是被动触发,需要从connection触发后转发给receiver的onrecv函数处理。

另外,protobuf恢复数据需要知道数据类型,而通过网络层得到的msgid是运行期的,没办法再用编译期的招数了,只能手动建map。

为了获得类型,我们还是要利用模板函数的,map大致上是这样建:

// part A
typedef void (TPBMsgReceiver<HolderType>::*HandleFunc)(const uint8* data, size_t sz);
typedef map<uint32, HandleFunc> MapHandleFunc;

// part B
template <typename HolderType>
void TPBMsgReceiver<HolderType>::RegisterHandleFunc(uint32 msgid, HandleFunc func)
{
	s_maphandle[msgid] = func;
}

// part C
template <typename HolderType>
template <uint32 msgid>
void TPBMsgReceiver<HolderType>::OnRemoteMsg(const uint8* data, size_t sz)
{
	PtlMapper::GetTypeById<msgid>::res msg;
	msg.ParseFromArray(data, sz);
	static_cast<HolderType*>(this)->OnRemoteMsg(msg);;
}

// part D
// 然后子类中...
typedef TPBMsgReceiver<CPipeEventServerHandler> receiver_t;
RegisterHandleFunc(eC2S_Test, &receiver_t::OnRemoteMsg<eC2S_Test>);
RegisterHandleFunc(eC2S_Hello, &receiver_t::OnRemoteMsg<eC2S_Hello>);

也可以考虑把part C放到子类去,这样就省的转型了,但HolderType的模板参数还是不能省,定义HandleFunc和下面OnRecv转发的时候还是会用到。

这样再把消息处理的一些细节加进来,最终OnRecv大概是这样:

template <typename HolderType>
typename TPBMsgReceiver<HolderType>::HandleFunc
TPBMsgReceiver<HolderType>::GetHandleFunc(uint32 msgid)
{
	return s_maphandle[msgid];
}

template <typename HolderType>
size_t TPBMsgReceiver<HolderType>::OnRecv(const uint8* data, size_t size)
{
	using google::protobuf::io::CodedInputStream;

	static uint32 buflen = 0;
	static uint32 msgid = 0;
	static uint32 prefixlen = sizeof(buflen) + sizeof(msgid);

	if (size < sizeof(buflen))
	{
		return 0;
	}

	CodedInputStream::ReadLittleEndian32FromArray(data, &buflen);

	if (size < buflen)
	{
		return 0;
	}
	
	CodedInputStream::ReadLittleEndian32FromArray(data+sizeof(buflen), &msgid);

	HandleFunc func = GetHandleFunc(msgid);
	if (!func)
	{
		cerr << "invalid msg id: " << msgid << endl;
		return buflen;	// drop the data
	}
	
	(this->*func)(data+prefixlen, buflen - prefixlen);

	return buflen;
}

逻辑层

最后应用的时候,只需要实现一个static void InitMsgHandle();方法,通信开始前找个地方调一下。

然后实现各个OnRemoteMsg(const MsgType& msg)方法,逻辑直接在这些方法里面处理即可。

因为这种处理消息的方法又多又杂,都包含在Connection类中会把它撑的比较大,可以考虑把其中OnRemoteMsg的实现部分放到另一个cpp文件中去。

总结

因为C++没有内建的rpc机制,所以我们编码的目标并不是真正实现rpc的形式,而是实现一种比较趋近的伪RPC——使用简单易用的方法来实现网络通信。实际上像C++这种语言真强要去设计那样的格式,也未必真做不到,只是要借助很多的宏了,未必像现在这样做更好。

Contents
  1. 1. 问题
  2. 2. 协议
  3. 3. 网络层
  4. 4. Sender和Receiver
  5. 5. 逻辑层
  6. 6. 总结