Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

What is the principle of Tutorial in C++ multi-process concurrency framework FFLIB

2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/02 Report--

C++ multi-process concurrent framework FFLIB, what is the principle of Tutorial, many novices are not very clear about this, in order to help you solve this problem, the following editor will explain in detail for you, people with this need can come to learn, I hope you can get something.

The Broker mode is specially adopted, which absorbs the ideas of MPI and Erlang.

About MPI: http://www.mcs.anl.gov/research/projects/mpi/

About Erlang: http://www.erlang.org/

FFLIB is currently in the alpha phase, and some useful features need to be added. But FFLIB was born to solve practical problems from the very beginning. Broker can be run as a stand-alone process or can be integrated into a specific process to start. In addition to these, the network layer implemented using epoll in FFLIB also has * reference value. There are some discussions about epoll ET and LT on the Internet. As to which way is simpler, my answer is ET. Epoll is a complete state machine in ET mode. Developers only need to implement the three states of FD: read, write and error.

I dug further into the functions of FFLIB. Write a Tutorial for FFLIB. Create more examples of FFLIB usage to delve into the meaning of FFLIB. In game development, or in some distributed environments, there are many familiar patterns. This article selects the following as an example of FFLIB:

Request/Reply

Point to point communication

Blocking communication

Multicast communication

Map/Reduce

Request/Reply

Asynchronous Request/Reply

All messages in FFLIB correspond to Request and reply one-to-one, and work in asynchronous mode by default. Suppose the following scenario: Flash connects to GatewayServer and sends Login message packet, GatewaServer parses username and password, and calls LoginServer verification.

First define msg:

Struct user_login_t {struct in_t: public msg_i {in_t (): msg_i ("user_login_t::in_t") {} string encode () {return (init_encoder () uid > > value;} long uid; string value;} Struct out_t: public msg_i {out_t (): msg_i ("user_login_t::out_t") {} string encode () {return (init_encoder () > value;} bool value;};}

This is how the interface is defined in LoginServer:

Class login_server_t {public: void verify (user_login_t::in_t& in_msg_, rpc_callcack_t& cb_) {user_login_t::out_t out; out.value = true; cb_ (out);}}; login_server_t login_server Singleton_t::instance () .create_service ("login_server", 1) .bind _ service (& login_server) .reg (& login_server_t::verify)

Call the above API in GatewayServer:

Struct lambda_t {static void callback (user_login_t::out_t& msg_, socket_ptr_t socket_) {if (true = = msg_.value) {/ /! Socket_- > send_msg ("login ok");} else {/! Socket_- > send_msg ("login failed");}; user_login_t::in_t in; in.uid = 520; in.value = "ILoveYou"; socket_ptr_t flash_socket = Null! TODO singleton_t::instance () .get _ service_group ("login_server_t")-> get_service (1)-> async_call (in, binder_t::callback (& lambda_t::callback, flash_socket))

As shown above, async_call can bind parameters to the callback function through the binder_t template function.

Synchronized Request/Reply

Most of the time we expect the Reply to be processed asynchronously, but sometimes the Reply must be processed first before the subsequent operation can be triggered, which usually happens when the program is initialized. Suppose the following scenario, when SceneServer starts, you must obtain the configuration from SuperServer before you can perform subsequent initialization operations such as loading scene data.

First define the msg of the communication:

Struct config_t {struct in_t: public msg_i {in_t (): msg_i ("config_t::in_t") {} string encode () {return (init_encoder () server_type > > server_id;} int server_type; int server_id;} Struct out_t: public msg_i {out_t (): msg_i ("config_t::out_t") {} string encode () {return (init_encoder () > value;} map value;};}

As shown above, msg serialization automatically supports map.

The interface that defines the return configuration in SuperServer:

Super_server_t super_server; singleton_t::instance () .create_service ("super_server", 1) .bind _ service (& super_server) .reg (& super_server_t::get_config); SceneServer can synchronize Request/Reply: rpc_future_t rpc_future; config_t::in_t in; in.server_type = 1; in.server_id = 1 Const config_t::out_t& out = rpc_future.call (singleton_t::instance (). Get_service_group ("super_server")-> get_service (1), in); cout uid;} long uid;} Struct out_t: public msg_i {out_t (): msg_i ("force_user_offline_t::out_t") {} string encode () {return (init_encoder () > value;} bool value;};}

GatewayServer notifies SessionServer users to go online and provides an API to force users to go offline:

Class gateway_server_t {public: void force_user_offline (force_user_offline_t::in_t& in_msg_, rpc_callcack_t& cb_) {/ /! Close user socket force_user_offline_t::out_t out; out.value = true; cb_ (out);}}; gateway_server_t gateway_server; singleton_t::instance (). Create_service ("gateway_server", 1) .bind _ service (& gateway_server) .reg (& gateway_server_t::force_user_offline); user_online_t::in_t in In.uid = 520; singleton_t::instance () .get _ service_group ("session_server")-> get_service (1)-> async_call (in, callback_TODO)

SessionServer provides an interface for users to go online, and the API of GatewayServer may be called to force users to go offline.

Class session_server_t {public: void user_login (user_online_t::in_t& in_msg_, rpc_callcack_t& cb_) {/ /! Close user socket user_online_t::out_t out; out.value = true; cb_ (out);}}; session_server_t session_server; singleton_t::instance (). Create_service ("session_server", 1) .bind _ service (& session_server) .reg (& session_server_t::user_login); force_user_offline_t::in_t in In.uid = 520; singleton_t::instance () .get _ service_group ("gateway_server")-> get_service (1)-> async_call (in, callback_TODO)

Multicast communication

As with peer-to-peer communication, to achieve multicast, you only need to know the service name of the target. In particular, there is the concept of service groups in FFLIB. For example, when multiple scenario servers SceneServer are started, except for different data, the interfaces of the two are exactly the same, which may be just different instances of the same process. These services are grouped into a service group in the FFLIB framework, and then the index id is assigned to each instance.

Suppose the following scenario, where a GM interface is to be implemented in SuperServer to notify all SceneServer to reload the configuration.

Define the msg of the communication:

Struct reload_config_t struct in_t: public msg_i {in_t (): msg_i ("reload_config_t::in_t") {} string encode () {return (init_encoder ()) .get_buff () } void decode (const string& src_buff_) {init_decoder (src_buff_);}}; struct out_t: public msg_i {out_t (): msg_i ("reload_config_t::out_t") {} string encode () {return (init_encoder () > value) } bool value;};

SceneServer provides a reload configuration interface:

Class scene_server_t {public: void reload_config (reload_config_t::in_t& in_msg_, rpc_callcack_t& cb_) {/ /! Close user socket reload_config_t::out_t out; out.value = true; cb_ (out);}}; scene_server_t scene_server; singleton_t::instance () .create_service ("scene_server", 1) .bind _ service (& scene_server) .reg (& scene_server_t::reload_config)

This is how multicast is implemented in SuperServer (more or less the same as broadcasting exactly):

Struct lambda_t {static void reload_config (rpc_service_t* rs_) {reload_config_t::in_t in; rs_- > async_call (in, callback_TODO);}}; singleton_t::instance () .get _ service_group ("scene_server")-> foreach (& lambda_t::reload_config)

Map/Reduce

The use of Map/reduce in the game is rare. I found the most common Map/reduce instance WordCount on the Internet. The situation is as follows: there are some text strings that count the number of occurrences of each character.

Map operation, which divides the text into multiple sub-texts and distributes them to multiple Worker processes for statistics

Reduce operation to summarize the calculated results of multiple groups of worker processes

Worker: count the number of times each character appears for the text

Define communication messages:

Struct word_count_t {struct in_t: public msg_i {in_t (): msg_i ("word_count_t::in_t") {} string encode () {return (init_encoder () > str;} string str;}) Struct out_t: public msg_i {out_t (): msg_i ("word_count_t::out_t") {} string encode () {return (init_encoder () > value;} map value;};}

Define the interface of woker:

Class worker_t {public: void word_count (word_count_t::in_t& in_msg_, rpc_callcack_t& cb_) {/ /! Close user socket word_count_t::out_t out; for (size_t I = 0; I

< in_msg_.str.size(); ++i) { map::iterator it = out.value.find(in_msg_.str[i]); if (it != out.value.end()) { it->

Second + = 1;} else {out.value [in _ msg_.str [I]] = 1;}} cb_ (out);}}; worker_t worker; for (int I = 0; I

< 5; ++i) { singleton_t::instance().create_service("worker", 1) .bind_service(&worker) .reg(&worker_t::word_count); } 模拟Map/reduce 操作: struct lambda_t { static void reduce(word_count_t::out_t& msg_, map* result_, size_t* size_) { for (map::iterator it = msg_.value.begin(); it != msg_.value.end(); ++it) { map::iterator it2 = result_->

Find (it- > first); if (it2! = result_- > end ()) {it2- > second + = it- > second;} else {(* result_) [it- > first] = it- > second }} if (--size_ = = 0) {/ / reduce end! Delete result_; delete size_;}} static void do_map (const char** p, size_t size_) {map* result = new map (); size_t* dest_size = new size_t (); * dest_size = size_; for (size_t I = 0; I

< size_; ++i) { word_count_t::in_t in; in.str = p[i]; singleton_t::instance() .get_service_group("worker") ->

Get_service (1 + I% singleton_t::instance (). Get_service_group ("worker")-> size ()-> async_call (in, binder_t::callback (& lambda_t::reduce, result, dest_size));}; const char* str_vec [] = {"oh nice", "oh fuck", "oh no", "oh dear", "oh wonderful", "oh bingo"} Lambda_t::do_map (str_vec, 6)

Summary:

FFLIB makes interprocess communication easier

Is it helpful for you to read the above content? If you want to know more about the relevant knowledge or read more related articles, please follow the industry information channel, thank you for your support.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report