In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Network Security >
Share
Shulou(Shulou.com)06/01 Report--
Generally speaking, our common RPC framework consists of the following three parts:
Registry for server-side registration of remote services and client discovery services
The server provides backstage services and registers its own service information with the registration center.
The client gets the registration information of the remote service from the registry, and then makes the remote procedure call
The registry mentioned above actually belongs to service governance, and even if there is no registry, the function of RPC is complete. Most of what I have come into contact with before is the registry based on zookeeper, where the basic functions of the registry are realized based on consul.
Some features of Consul:
Raft is more direct than Paxos.
In addition, without much description, there is no study of raft.
Support for data centers that can be used to solve problems such as a single point of failure
Integration is simpler than zookeeper (less code, clear and simple logic)
Support health check, support http and tcp
Comes with UI management capabilities and does not require additional third-party support. (zookeeper requires separate deployment of third-party tools such as zkui)
Support for key/value storage
Access the administration page after starting consul
RPC integration
Two interfaces of service registration and service discovery are extracted and implemented using Consul. Here, it is mainly implemented through consul-client (or consul-api), which needs to be introduced in pom:
Com.orbitz.consul consul-client 0.14.1 Service Registration
RegistryService
Provide registration and deletion functions for services
Public interface RegistryService {void register (RpcURL url); void unregister (RpcURL url);}
AbstractConsulService
The base class of consul, used to build Consl objects that serve both the server and the client.
Public class AbstractConsulService {private static final Logger logger = LoggerFactory.getLogger (AbstractConsulService.class); protected final static String CONSUL_NAME= "consul_node_jim"; protected final static String CONSUL_ID= "consul_node_id"; protected final static String CONSUL_TAGS= "v3"; protected final static String CONSUL_HEALTH_INTERVAL= "1s"; protected Consul buildConsul (String registryHost, int registryPort) {return Consul.builder (). WithHostAndPort (HostAndPort.fromString (registryHost+ ":" + registryPort)). Build ();}}
ConsulRegistryService
The service registers the implementation class, which specifies the health check while registering the service.
Deletion of service has not been implemented for the time being
Public class ConsulRegistryService extends AbstractConsulService implements RegistryService {private final static int CONSUL_CONNECT_PERIOD=1*1000; @ Override public void register (RpcURL url) {Consul consul = this.buildConsul (url.getRegistryHost (), url.getRegistryPort ()); AgentClient agent = consul.agentClient (); ImmutableRegCheck check = ImmutableRegCheck.builder () .tcp (url.getHost () + ":" + url.getPort ()) .interval (CONSUL_HEALTH_INTERVAL) .build (); ImmutableRegistration.Builder builder = ImmutableRegistration.builder () Builder.id (CONSUL_ID) .name (CONSUL_NAME) .addTags (CONSUL_TAGS) .address (url.getHost ()) .port (url.getPort ()) .addChecks (check); agent.register (builder.build ());} @ Override public void unregister (RpcURL url) {}}
Since the RPC I implemented is based on TCP, the health check for service registration is also specified that TCP,consul will establish a connection according to the specified IP and port to determine the health status of the service. In the case of http, you need to call the http method and specify the health check address.
ImmutableRegCheck check = ImmutableRegCheck.builder () .tcp (url.getHost () + ":" + url.getPort ()) .interval (CONSUL_HEALTH_INTERVAL) .build ()
The monitoring information at the backend is as follows:
Although only TCP is specified, the backend may still initiate a health check request for HTTP due to some mechanism. The first request log in the figure above.
Service discovery
DiscoveryService
Get valid service information for all registrations.
Public interface DiscoveryService {List getUrls (String registryHost, int registryPort);}
ConsulDiscoveryService
The first is to get a valid list of services:
List urls= Lists.newArrayList (); Consul consul = this.buildConsul (registryHost,registryPort); HealthClient client = consul.healthClient (); String name = CONSUL_NAME;ConsulResponse object= client.getAllServiceInstances (name); List serviceHealths= (List) object.getResponse (); for (ImmutableServiceHealth serviceHealth:serviceHealths) {RpcURL url=new RpcURL (); url.setHost (serviceHealth.getService (). GetAddress ()); url.setPort (serviceHealth.getService (). GetPort ()); urls.add (url);}
Service update snooping needs to notify the caller when a change is found in the list of available services.
Try {ServiceHealthCache serviceHealthCache = ServiceHealthCache.newCache (client, name); serviceHealthCache.addListener (new ConsulCache.Listener () {@ Override public void notify (Map map) {logger.info ("serviceHealthCache.addListener notify"); RpcClientInvokerCache.clear ();}}); serviceHealthCache.start ();} catch (Exception e) {logger.info ("serviceHealthCache.start error:", e);}
Because there was a cache for the client's Invoker, the cache information needs to be updated when the list of services changes.
Here is a simple way to clear the cache directly, in fact, a better method should only deal with changes.
RpcClientInvokerCache
The cache class for the Invoker instantiated by the client
Public class RpcClientInvokerCache {private static CopyOnWriteArrayList connectedHandlers=new CopyOnWriteArrayList (); public static CopyOnWriteArrayList getConnectedHandlersClone () {return (CopyOnWriteArrayList) RpcClientInvokerCache.getConnectedHandlers (). Clone ();} public static void addHandler (RpcClientInvoker handler) {CopyOnWriteArrayList newHandlers = getConnectedHandlersClone (); newHandlers.add (handler); connectedHandlers=newHandlers;} public static CopyOnWriteArrayList getConnectedHandlers () {return connectedHandlers;} public static RpcClientInvoker get (int I) {return connectedHandlers.get (I) } public static int size () {return connectedHandlers.size ();} public static void clear () {CopyOnWriteArrayList newHandlers = getConnectedHandlersClone (); newHandlers.clear (); connectedHandlers=newHandlers;}}
Load balancing
When multiple services are provided at the same time on the same interface, the client needs to have a certain load balancing mechanism to decide which server to assign the client's request to. Here, a simple polling implementation is implemented. The number of requests is accumulated, and the accumulated values are modeled with the size of the service list.
There is a small problem with the method of fetching the service list in the code. It is not taken according to the interface information, and it will be completed later.
Public class RoundRobinLoadbalanceService implements LoadbalanceService {private AtomicInteger roundRobin = new AtomicInteger (0); private static final int MAX_VALUE=1000; private static final int MIN_VALUE=1; private AtomicInteger getRoundRobinValue () {if (this.roundRobin.getAndAdd (1) > MAX_VALUE) {this.roundRobin.set (MIN_VALUE);} return this.roundRobin @ Override public int index (int size) {return (this.getRoundRobinValue () .get () + size)% size;}}
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.