開發環境:Ubuntu VS Code
編譯器:g++
框架源碼下載:GitHub
認識RPC
RPC的全稱是遠程過程調用(Remote Procedure Call)。
什么是遠程過程調用呢?
那么對于一個聊天系統有int send_information(int friend_id,string msg)這個方法,我們的一個處理邏輯是不是這樣:
- 調用bool is_exist(int friend_id)判斷用戶是否在線
- 根據結果在決議是發送在線消息還是離線消息。
那么對于一個繼承了登錄和聊天功能的系統,我們在本地調用一個函數,就直接返回值=函數名(參數1,參數2),就直接執行了這個過程并且得到了返回值。
但是如果考慮高并發、高可用以及系統擴展性的話,那我們不得不引入分布式的設計。這意味這,登錄和聊天會部署在不同的機器上!那么要完成上面的邏輯,就不得不依靠網絡,將要調用的函數名以及參數通過網絡打包發送給另一個機器,然后由另一臺機器將結果發過來。
而我們要做的RPC框架就是為使這個過程更好用而設計的。
RPC框架的設計
博文的標題是基于 protobuf 和 zookeeper 的RPC框架,那么protobuf 和 zookeeper又在整個框架啊中扮演怎樣的角色呢?
protobuf 作用
protobuf 主要是作為整個框架的傳輸協議。我們可以看一下整個框架對于傳輸信息的格式定義:
{
bytes service_name = 1; //類名
bytes method_name = 2; //方法名
uint32 args_size = 3; //參數大小
}
可以看到,它定義了要調用方法是屬于哪個類的哪個方法以及這個方法所需要的的參數大小。
那么參數呢?是怎樣定義的?
首先我們來看一下我們框架內傳輸的數據是什么:4字節標識頭部長度+RpcHeader+args
* 16表示整個類名+方法名+參數長度的大小
有個這個長度,我們就可以從整個長度中截取UserServiceLogin15這一段
再根據RpcHeader來反序列話得出類名和方法名以及參數長度三個重要數據
* 15表示后面的參數長度
由于我們找到了類名和方法名,我們就可以在整個框架存儲這些信息的地方得到一個對于這個方法的描述。
然后借用protobuf的service對象提供的一個接口GetResponsePrototype,并且根據這個方法的描述來反序列化出參數。這個都是根據我們注冊的方法的描述來做的。
*/
18UserServiceLogin15zhang san123456
注:如果還是有點迷糊,可以保留參數解釋信息,看到后面就大致懂了
zookeeper
zookeeper 呢在這里其實主要就是起到了一個配置中心的目的。
什么意思呢?
zookeeper上面我們標識了每個類的方法所對應的分布式節點地址,當我們其他服務器想要RPC的時候,就去 zookeeper 上面查詢對應要調用的服務在哪個節點上。
注意:這里就相當于,我其他服務器來 zookeeper 查詢User::is_exist,然后會得到127.0.0.1:8001 這個值,這個值就是你布置這個功能的一個RPC節點的網絡標識符,然后向這個節點去發送參數并且等待這個節點的相應。
從框架的使用來認識
框架的使用一般都是在example目錄下的 callee/UserService.cpp 里面
//框架服務提供provider
RpcProvider provide;
provide.notify_service(new UserService());
provide.run();
可以看到,主要去做了三個事情:
- 首先 RPC 框架肯定是部署到一臺服務器上的,所以我們需要對這個服務器的 ip 和 port 進行初始化
- 然后創建一個 porvider(也就是server)對象,將當前 UserService 這個對象傳遞給他,也就是其實這個 RPC 框架和我們執行具體業務的節點是在同一個服務器上的。RPC框架負責解析其他服務器傳遞過來的請求,然后將這些參數傳遞給本地的方法。并將返回值返回給其他服務器。
- 最后是去讓這個 provider 去 run 起來。具體我們其實可以看一下源代碼:
void RpcProvider::run()
{
//獲取ip和port
string ip = RpcApplication::get_instance().get_configure().find_load("rpcserver_ip");
uint16_t port = atoi(RpcApplication::get_instance().get_configure().find_load("rpcserver_port").c_str());
//cout << ip << ":" << port << endl;
InetAddress address(ip, port);
//創建tcpserver對象
TcpServer server(&eventloop_, address, "RpcProvider");
//綁定鏈接回調和消息讀寫回調方法
server.setConnectionCallback(bind(&RpcProvider::on_connection, this, _1));
server.setMessageCallback(bind(&RpcProvider::on_message, this, _1, _2, _3));
//設置muduo庫的線程數量
server.setThreadNum(4);
//把當前rpc節點上要發布的服務全部注冊到zk上面,讓rpc client可以從zk上發現服務
ZookeeperClient zk_client;
zk_client.start();
//在配置中心中創建節點
for (auto &sp : service_map_)
{
string service_path = "/" + sp.first;
zk_client.create(service_path.c_str(), nullptr, 0);
for (auto &mp : sp.second.method_map_)
{
string method_path = service_path + "/" + mp.first;
char method_path_data[128] = {0};
sprintf(method_path_data, "%s:%d", ip.c_str(), port);
//ZOO_EPHEMERAL 表示znode時候臨時性節點
zk_client.create(method_path.c_str(), method_path_data, strlen(method_path_data), ZOO_EPHEMERAL);
}
}
RPC_LOG_INFO("server RpcProvider [ip: %s][port: %d]", ip.c_str(), port);
//啟動網絡服務
server.start();
eventloop_.loop();
}
可以看到,整個 run 其實就是干了這么幾件事情:
- 因為底層調用的是muduo網絡庫,所以這里會獲取ip地址和端口號,然后初始化網絡層
- 然后去設置一個連接回調以及發生讀寫事件時候的回調函數(稍后介紹)
- 然后設置整個 muduo 網絡庫工作的線程數量
- 然后創建zookeeper配置中心,將這些方法的信息以及本機的IP地址注冊到zookeeper
- 然后開啟本機服務器的事件循環,等待其他服務器的連接
其他服務器是怎樣調用的呢
我們看一下 example 目錄下的 caller/CallUserService.cpp 里面是怎樣調用的。
RpcApplication::init(argc, argv);
//演示調用遠程發布的rpc方法login
ik::UserServiceRpc_Stub stub(new RpcChannel());
//請求參數和響應
ik::LoginRequest request;
request.set_name("zhang san");
request.set_password("123456");
ik::LoginResponse response;
RpcControl control;
//發起rpc調用,等待響應返回
stub.Login(&control, &request, &response, nullptr);
//rpc調用完成,讀調用的結果
if (response.errmsg().error() == 0)
{
//沒錯誤
cout << "rpc login response: " << response.success() << endl;
}
else
{
//有錯誤
cout << "rpc login response errer: " << response.errmsg().error_msg() << endl;
}
同樣,也是有以下幾個步驟:
- 初始化 RPC 遠程調用要連接的服務器
- 定義一個 UserSeervice 的 stub 樁類,由這個裝類去調用Login方法,這個login方法可以去看一下源碼的定義:
void Login(::google::protobuf::RpcController *controller,
const ::ik::LoginRequest *request,
::ik::LoginResponse *response,
::google::protobuf::Closure *done)
{
//框架給業務上報了請求參數 request,業務獲取相應數據做本地業務
string name = request->name();
string password = request->password();
//本地業務
bool login_result = Login(name, password);
//把響應給調用方返回
ik::ErrorMsg *errmsg = response->mutable_errmsg();
errmsg->set_error(0);
errmsg->set_error_msg("");
response->set_success(login_result);
//執行回調操作
done->Run();
}
可以看到,Login的 RPC 重載函數有四個參數:controller(表示函數是否出錯)、request(參數)、response(返回值)、done(回調函數)
其主要做的也是去圍繞著解析參數,將參數放入本地調用的方法,將結果返回并執行回調函數。至于這個回調函數則是在服務端執行讀寫事件回調函數綁定的。
綁定的是如下方法:
{
string response_str;
//序列化
if (response->SerializeToString(&response_str))
{
//發送序列化的數據
conn->send(response_str);
}
else
{
//序列化失敗
RPC_LOG_ERROR("serialize reponse error");
}
//短鏈接
conn->shutdown();
}
它會將由bind函數綁定的參數:response_str 發送回去,然后調用方的服務器就會收到這個返回值并解析它。
樁類是干嘛的
那么其實現在來說,我們并沒有看到調用方是何時發送了要調用方法以及相應參數?
我們還是需要去返回樁類,這個是由protobuf自動去幫你生成的。
public:
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel);
UserServiceRpc_Stub(::google::protobuf::RpcChannel* channel,
::google::protobuf::Service::ChannelOwnership ownership);
~UserServiceRpc_Stub();
inline ::google::protobuf::RpcChannel* channel() { return channel_; }
// implements UserServiceRpc ------------------------------------------
void Login(::google::protobuf::RpcController* controller,
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done);
void Register(::google::protobuf::RpcController* controller,
const ::ik::RegisterRequest* request,
::ik::RegisterResponse* response,
::google::protobuf::Closure* done);
private:
::google::protobuf::RpcChannel* channel_;
bool owns_channel_;
GOOGLE_DISALLOW_EVIL_CONSTRUCTORS(UserServiceRpc_Stub);
};
我們在定義樁類的時候,會傳入一個RpcCannel的指針,這個綁定到這個樁類的channel_指針。
當我們去調用這個樁類的Login方法的時候,會去調用傳遞進來的channel的CallMethod方法:
const ::ik::LoginRequest* request,
::ik::LoginResponse* response,
::google::protobuf::Closure* done) {
channel_->CallMethod(descriptor()->method(0),
controller, request, response, done);
}
所以,顯而易見,我去發送這些方法啊,參數啊,都是在CallMethod這個方法中執行的。
那么CallMethod里面執行的內容對我們理解RPC調用體系至關重要(代碼比較長,可以直接跳過聽結論):
google::protobuf::RpcController *controller,
const google::protobuf::Message *request,
google::protobuf::Message *response,
google::protobuf::Closure *done)
{
const google::protobuf::ServiceDescriptor *service_des = method->service();
string service_name = service_des->name();
string method_name = method->name();
//獲取參數的序列化字符串長度 args_size
int args_size = 0;
string args_str;
if (request->SerializeToString(&args_str))
{
args_size = args_str.size();
}
else
{
controller->SetFailed("serialize request error!");
return;
}
//定義rpc的請求header
ikrpc::RpcHeader rpc_header;
rpc_header.set_service_name(service_name);
rpc_header.set_method_name(method_name);
rpc_header.set_args_size(args_size);
uint32_t header_size = 0;
string rpc_header_str;
if (rpc_header.SerializeToString(&rpc_header_str))
{
//序列化成功
header_size = rpc_header_str.size();
}
else
{
//序列化失敗
controller->SetFailed("serialize rpc_header error!");
return;
}
//組織待發送的rpc請求的字符串
string send_rpc_str;
send_rpc_str.insert(0, string((char *)&header_size, 4)); //header_size
send_rpc_str += rpc_header_str; //rpc_header
send_rpc_str += args_str; //args_str
//使用tcp編程,完成rpc方法的遠程調用
int client_fd = socket(AF_INET, SOCK_STREAM, 0);
if (client_fd == -1)
{
close(client_fd);
RPC_LOG_FATAL("create socket error! errno:%d", errno);
}
//獲取ip和port
ZookeeperClient zk_client;
zk_client.start();
string method_path = "/" + service_name + "/" + method_name;
string host_data = zk_client.get_data(method_path.c_str());
if(host_data == "")
{
controller->SetFailed(method_path+" is not exist");
return;
}
int host_index = host_data.find(":");
if(host_index == -1)
{
controller->SetFailed(method_path+" address is invalid!");
return;
}
string ip = host_data.substr(0,host_index);
uint16_t port = atoi(host_data.substr(host_index+1,host_data.size()-host_index).c_str());
struct sockaddr_in server_addr;
server_addr.sin_family = AF_INET;
server_addr.sin_port = htons(port);
server_addr.sin_addr.s_addr = inet_addr(ip.c_str());
if (connect(client_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1)
{
close(client_fd);
RPC_LOG_FATAL("connet error! errno: %d", errno);
}
// 發送rpc請求
if (send(client_fd, send_rpc_str.c_str(), send_rpc_str.size(), 0) == -1)
{
controller->SetFailed("send error! errno: " + errno);
close(client_fd);
return;
}
//接受rpc請求
char recv_buffer[BUFF_SIZE] = {0};
ssize_t recv_size = recv(client_fd, recv_buffer, BUFF_SIZE, 0);
if (recv_size == -1)
{
controller->SetFailed("recv error! errno: " + errno);
close(client_fd);
return;
}
//反序列化響應數據
//String 因為遇到?會認為是字符串結束,所以用Array
if (!response->ParseFromArray(recv_buffer, recv_size))
{
string recv = recv_buffer;
controller->SetFailed("parse error! response_str: " + recv);
close(client_fd);
return;
}
close(client_fd);
}
- 組織要發送的 request_str 字符串
- 從zookeeper中拿到服務端的 ip 和 port,連接服務端
- 發送 request_str
- 接受服務端返回過來的 response 字符串并反序列化出結果
總結一下RPC流程
-
RPC
+關注
關注
0文章
111瀏覽量
11515 -
源碼
+關注
關注
8文章
633瀏覽量
29140 -
函數
+關注
關注
3文章
4307瀏覽量
62433 -
遠程過程調用
+關注
關注
0文章
2瀏覽量
705
發布評論請先 登錄
相關推薦
評論