In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-03-04 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "what is the running logic within proxy". The content of the explanation is simple and clear, and it is easy to learn and understand. Please follow the editor's train of thought to study and learn "what is the running logic within proxy".
Linkerd2 introduction
Linkerd consists of a control plane and a data plane:
The control plane is a set of services that run in the Kubernetes namespace to which they belong (linkerd default). These services can complete the aggregation of telemetry data, provide user-oriented API, and provide control data to the data plane agent, which jointly drive the data plane.
The data plane is a lightweight agent written in Rust, which is installed in each pod of the service and becomes part of the data plane. It receives all incoming traffic from the Pod and configures iptables to properly forward traffic through initContainer to block all outgoing traffic. Because it is an additional tool and blocks all incoming and outgoing traffic from the service, there is no need to change the code, and it can even be added to the running service.
To borrow the official picture:
Proxy is developed by rust, its internal asynchronous runtime uses the Tokio framework, and the service component uses tower.
This paper mainly focuses on the overall logic related to the interaction between proxy and destination components, and analyzes the running logic within proxy.
Process analysis initialization
After proxy starts:
App::init initialization configuration
App::Main::new creates the master logic main
Add a new task ProxyParts::build_proxy_task to main.run_until.
A series of initialization work will be done in ProxyParts::build_proxy_task. Here we only focus on dst_svc. The creation code is as follows:
Dst_svc = svc::stack (connect::svc (keepalive)) .push (tls::client::layer (local_identity.clone () .push _ timeout (config.control_connect_timeout) .push (control::client::layer ()) .push (control::resolve::layer (dns_resolver.clone () .push (reconnect::layer ({let backoff = config.control_backoff.clone () Move | _ | Ok (backoff.stream ()})) .push (http_metrics::layer:: (ctl_http_metrics.clone ()) ) .push (proxy::grpc::req_body_as_payload::layer () .per_make ()) .push (control::add_origin::layer ()) .push _ buffer_pending (config.destination_buffer_capacity, config.control_dispatch_timeout) ) .into _ inner () .make (config.destination_addr.clone ())
There are two references to dst_svc, one is the creation of crate::resolve::Resolver, and the other is the creation of ProfilesClient.
Resolver
Api_resolve::Resolve::new (dst_svc.clone ()) creates a resolver object
Call outbound::resolve to create a map_endpoint::Resolve type object, and pass it to the outbound::spawn function as a parameter resolve to start the exit thread
In outbound::spawn, resolve is used to create a load balancing control layer and for subsequent routing control:
Let balancer_layer = svc::layers () .push _ spawn_ready () .push (discover::Layer::new (DISCOVER_UPDATE_BUFFER_CAPACITY, resolve,)) .push (balance::layer (EWMA_DEFAULT_RTT, EWMA_DECAY))
In discover::Layer::layer:
Let from_resolve = FromResolve::new (self.resolve.clone ()); let make_discover = MakeEndpoint::new (make_endpoint, from_resolve); Buffer::new (self.capacity, make_discover) Profiles
Call api::client::Destination::new (dst_svc) in ProfilesClient::new to create the client side of grpc and exist in the member variable service
The profiles_client object is then used to create inbound and outbound (omitting extraneous code):
Let dst_stack = svc::stack (...). Push (profiles::router::layer (profile_suffixes, profiles_client, dst_route_stack,))...
Where profiles::router::layer creates a Layer object and assigns profiles_client to the get_routes member. Then in the service method, it is called to the Layer::layer method, where a MakeSvc object is created and the value of its get_routes member is profiles_client.
Running
When a new connection comes in, after getting the connection object from listen, it will be handed over to linkerd_proxy::transport::tls::accept::AcceptTls 's call, then linkerd2_proxy::proxy::server::Server 's call, and finally the linkerd2_proxy_http::balance::MakeSvc::call and linkerd2_proxy_http::profiles::router::MakeSvc::call methods will be called respectively.
Balance
In linkerd2_proxy_http::balance::MakeSvc::call:
Call inner.call (target), where inner is the result of the previous Buffer::new.
Generate a new linkerd2_proxy_http::balance::MakeSvc object, which is returned as Future
Let's take a look at inner.call. After layers of internal calls, it triggers the call method of Buffer, MakeEndpoint, FromResolve and other structures in turn, and finally triggers the resolve.resolve (target) created at the beginning, and its internal call api_resolve::Resolve::call.
In api_resolve::Resolve::call:
Fn call (& mut self, target: t)-> Self::Future {let path = target.to_string (); trace! ("resolve {:?}", path) Self.service / / GRPC request Get endpoint .get of k8s (grpc::Request::new (api::GetDestination {path, scheme: self.scheme.clone (), context_token: self.context_token.clone (),}) .map (| rsp | {debug! (metadata =? rsp.metadata () / / get the result stream Resolution {inner: rsp.into_inner (),}})}
Put the returned Resolution into the MakeSvc again, and then look at its poll:
Fn poll (& mut self)-> Poll {/ / this poll will call: / / linkerd2_proxy_api_resolve::resolve::Resolution::poll / / linkerd2_proxy_discover::from_resolve::DiscoverFuture::poll / / linkerd2_proxy_discover::make_endpoint::DiscoverFuture::poll / / finally get Poll let discover = try Ready! (self.inner.poll ()) Let instrument = PendingUntilFirstData::default (); let loaded = PeakEwmaDiscover::new (discover, self.default_rtt, self.decay, instrument); let balance = Balance::new (loaded, self.rng.clone ()); Ok (Async::Ready (balance))}
Finally, service Balance is returned.
When a specific request is received, the Balance::poll_ready will be judged first:
Fn poll_ready (& mut self)-> Poll {/ / get Update / / remove Remove from self.ready_services / / add the construction UnreadyService structure of Insert to self.unready_services self.poll_discover ()? / / A pair of UnreadyService, call its poll, and internally call the poll_ready of svc to determine whether endpoint is available / / add it to self.ready_services self.poll_unready () Loop {if let Some (index) = self.next_ready_index {/ / find the corresponding endpoint. If available, return if let Ok (Async::Ready (() = self.poll_ready_index_or_evict (index) {return Ok (Async::Ready () }} / / Select endpoint self.next_ready_index = self.p2c_next_ready_index (); if self.next_ready_index.is_none () {/ / return Ok (Async::NotReady);}
When ready, call call on the request req:
Fn call (& mut self, request: Req)-> Self::Future {/ / find the next available svc and remove it from ready_services let index = self.next_ready_index.take () .expect ("not ready"); let (key, mut svc) = self .ready _ services .swap _ remove_index (index) .swap ("invalid ready index") / / transfer the request to let fut = svc.call (request); / / add it to unready self.push_unready (key, svc); fut.map_err (Into::into)} profiles
In linkerd2_proxy_http::profiles::router::MakeSvc::call:
/ / Initiate a stream to get route and dst_override updates for this / / destination. Let route_stream = match target.get_destination () {Some (ref dst) = > {if self.suffixes.iter () .any (| s | s.contains (dst.name () {debug! ("fetching routes for {:?}", dst) Self.get_routes.get_routes (& dst)} else {debug! ("skipping route discovery for dst= {:?}", dst); None}} None = > {debug! ("no destination for routes") None}}
After several judgments, ProfilesClient::get_routes is called and the result is stored in route_stream.
Enter get_routes:
Fn get_routes (& self, dst: & NameAddr)-> Option {/ / create channel let (tx, rx) = mpsc::channel (1); / / This oneshot allows the daemon to be notified when the Self::Stream / / is dropped. Let (hangup_tx, hangup_rx) = oneshot::channel () / / create Daemon object (Future task) let daemon = Daemon {tx, hangup: hangup_rx, dst: format! ("{}", dst), state: State::Disconnected, service: self.service.clone (), backoff: self.backoff, context_token: self.context_token.clone (),} / / call Daemon::poll let spawn = DefaultExecutor::current () .spawn (Box::new (daemon.map_err (| _ | (); / / send the channel receiver out spawn.ok () .map (| _ | Rx {rx, _ hangup: hangup_tx,})}
Then take a look at Daemon::poll:
Fn poll (& mut self)-> Poll {loop {/ / traversing state member status self.state = match self.state {/ / unconnected State::Disconnected = > {match self.service.poll_ready () {Ok (Async::NotReady) = > return Ok (Async::NotReady) Ok (Async::Ready ()) = > {} Err (err) = > {error! ("profile service unexpected error (dst = {}): {:?}", self.dst, err ) Return Ok (Async::Ready ());}} / / construct grpc request let req = api::GetDestination {scheme: "K8s" .to _ owned (), path: self.dst.clone (), context_token: self.context_token.clone (),} Debug! ("getting profile: {:?}", req); / / get request task let rspf = self.service.get_profile (grpc::Request::new (req)) State::Waiting (rspf)} / / at the time of request Get reply from request State::Waiting (ref mut f) = > match f.poll () {Ok (Async::NotReady) = > return Ok (Async::NotReady), / / normal reply Ok (Async::Ready (rsp)) = > {trace! ("response received") / / streaming reply State::Streaming (rsp.into_inner ())} Err (e) = > {warn! ("error fetching profile for {}: {:}", self.dst, e) State::Backoff (Delay::new (clock::now () + self.backoff))}}, / / receive reply State::Streaming (ref mut s) = > {/ / process reply stream / / Note here Parameter 1 is the reply flow of the get_profile request / / Parameter 2 is the previously created channel sender match Self::proxy_stream (s, & mut self.tx, & mut self.hangup) {Async::NotReady = > return Ok (Async::NotReady), Async::Ready (StreamState::SendLost) = > return Ok (() .into ()) Async::Ready (StreamState::RecvDone) = > {State::Backoff (Delay::new (clock::now () + self.backoff))} / / exception End request State::Backoff (ref mut f) = > match f.poll () {Ok (Async::NotReady) = > return Ok (Async::NotReady), Err (_) | Ok (Async::Ready (() = > State::Disconnected,},} }}
Then proxy_stream:
Fn proxy_stream (rx: & mut grpc::Streaming, tx: & mut mpsc::Sender, hangup: & mut oneshot::Receiver,)-> Async {loop {/ / whether the sender is ready match tx.poll_ready () {Ok (Async::NotReady) = > return Async::NotReady Ok (Async::Ready ()) = > {} Err (_) = > return StreamState::SendLost.into () } / / get a piece of data from grpc stream match rx.poll () {Ok (Async::NotReady) = > match hangup.poll () {Ok (Async::Ready (never)) = > match never {}, / / unreachable! Ok (Async::NotReady) = > {/ / We are now scheduled to be notified if the hangup tx / / is dropped. Return Async::NotReady;} Err (_) = > {/ / Hangup tx has been dropped. Debug! ("profile stream cancelled"); return StreamState::SendLost.into () }}, Ok (Async::Ready (None)) = > return StreamState::RecvDone.into (), / / get the profile structure Ok (Async::Ready (Some (profile)) correctly) = > {debug! ("profile received: {:?}", profile) / / parse data let retry_budget = profile.retry_budget.and_then (convert_retry_budget) Let routes = profile .accoun.into _ iter () .filter _ map (move | orig | convert_route (orig, retry_budget.as_ref ()) .filter () Let dst_overrides = profile .dst _ overrides .into _ iter () .filter _ map (convert_dst_override) .filter () / / construct the profiles::Routes structure and push it to the sender match tx.start_send (profiles::Routes {routes, dst_overrides) }) {Ok (AsyncSink::Ready) = > {} / / continue Ok (AsyncSink::NotReady (_)) = > {info! ("dropping profile update due to a full buffer") / / This must have been because another task stole / / our tx slot? It seems pretty unlikely, but possible? Return Async::NotReady;} Err (_) = > {return StreamState::SendLost.into () } Err (e) = > {warn! ("profile stream failed: {:?}", e); return StreamState::RecvDone.into ();}}
Back to the MakeSvc::call method, the route_stream created earlier will be used to create a linkerd2_proxy::proxy::http::profiles::router::Service task object, and in its poll_ready method, get the profiles::Routes from route_steam through poll_route_stream and call update_routes to create a specific available routing rule linkerd2_router::Router. At this point, the routing rule has been established. Just wait for the specific request to come and then call linkerd2_router::call in call to determine the route of the request.
Figure shows profile
Thank you for your reading, the above is the content of "what is the running logic within proxy". After the study of this article, I believe you have a deeper understanding of what the internal running logic of proxy is, and the specific use needs to be verified in practice. Here is, the editor will push for you more related knowledge points of the article, welcome to follow!
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.