In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-18 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly introduces "when Device Manager was created". In daily operation, I believe many people have doubts about when to create Device Manager. The editor consulted all kinds of materials and sorted out simple and easy-to-use operation methods. I hope it will be helpful to answer the doubts about "when Device Manager was created". Next, please follow the editor to study!
When was Create Device Manager InstanceDevice Manager created
Device Manager, like Volume Manager and QoS Container Manager, belongs to one of the many Manager managed by kubelet. The Device Manager is created in the NewContainerManager when kubelet starts.
Pkg/kubelet/cm/container_manager_linux.go:197func NewContainerManager (mountUtil mount.Interface, cadvisorInterface cadvisor.Interface, nodeConfig NodeConfig, failSwapOn bool, devicePluginEnabled bool, recorder record.EventRecorder) (ContainerManager, error) {. Glog.Infof ("Creating device plugin manager:% t", devicePluginEnabled) if devicePluginEnabled {cm.deviceManager, err = devicemanager.NewManagerImpl ()} else {cm.deviceManager, err = devicemanager.NewManagerStub ()} if err! = nil {return nil, err}...} ManagerImpl structure
It is necessary to first understand the structure of Device Manager:
/ / ManagerImpl is the structure in charge of managing Device Plugins.type ManagerImpl struct {socketname string socketdir string endpoints map [string] endpoint / / Key is ResourceName mutex sync.Mutex server * grpc.Server / / activePods is a method for listing active pods on the node / / so the amount of pluginResources requested by existing pods / / could be counted when updating allocated devices activePods ActivePodsFunc / / sourcesReady provides the readiness of kubelet configuration sources such as apiserver update readiness. / / We use it to determine when we can purge inactive pods from checkpointed state. SourcesReady config.SourcesReady / / callback is used for updating devices' states in one time call. / / e.g. A new device is advertised, two old devices are deleted and a running device fails. Callback monitorCallback / / healthyDevices contains all of the registered healthy resourceNames and their exported device IDs. HealthyDevices map[string] sets.String / / unhealthyDevices contains all of the unhealthy devices and their exported device IDs. UnhealthyDevices map[string] sets.String / / allocatedDevices contains allocated deviceIds, keyed by resourceName. AllocatedDevices map[string] sets.String / / podDevices contains pod to allocated device mapping. PodDevices podDevices store utilstore.Store pluginOpts map [string] * pluginapi.DevicePluginOptions}
Here is a description of the core field:
Socketname: the socket name exposed by kubelet, that is, kubelet.sock.
Socketdir: the directory where device plugins' socket is stored, / var/lib/kubelet/device-plugins/.
Endpoints: map object, key is Resource Name,value, and endpoint interface (including run,stop,allocate,preStartContainer,getDevices,callback,isStoped,StopGracePeriodExpired). Each endpoint interface corresponds to a registered device plugin, which is responsible for communicating with gRPC of device plugin and caching device states of device plugin feedback.
Server: the gRPC Server exposed by the Register service.
ActivePods: the Pods used to get all the active pods, that is, the non-Terminated status, on this node. In the initializeRuntimeDependentModules of kubelet, activePods Func is registered as the following function:
/ / GetActivePods returns non-terminal pods func (kl * Kubelet) GetActivePods () [] * v1.Pod {allPods: = kl.podManager.GetPods () activePods: = kl.filterOutTerminatedPods (allPods) return activePods}
Callback: it is a callback function when kubelet receives devices state changes in the ListAndWatch gRCP stream of device plugin, including the addition of new devices, deletion of old devices, and changes in device status. Therefore, automatic device discovery and hot plug can be realized through the callback of ListAndWatch API.
Type monitorCallback func (resourceName string, added, updated, deleted [] pluginapi.Device)
HealthyDevices: map object. Key is the Resource Name,value and the corresponding healthy device IDs.
UnhealthyDevices: map object, key is Resource Name,value, and Resource Name,value is the corresponding unhealthy device IDs.
AllocatedDevices: map object, and key is Resource Name,value, which is the assigned device IDs.
PodDevices: record the device allocation for each container in each pod.
/ / ContainerAllocateResponse is the allocation information corresponding to a device in the container, including injected environment variables, mount information, and Annotations. Type ContainerAllocateResponse struct {Envs map [string] string Mounts [] * Mount Devices [] * DeviceSpec Annotations map [string] string} / / deviceAllocateInfo type deviceAllocateInfo struct {deviceIds sets.String allocResp * pluginapi.ContainerAllocateResponse} type resourceAllocateInfo map [string] deviceAllocateInfo / / Keyed by resourceName. Type containerDevices map[string] resourceAllocateInfo / / Keyed by containerName. Type podDevices map[string] containerDevices / / Keyed by podUID.
Store: file storage for checkpointData (/ var/lib/kubelet/device-plugins/kubelet_internal_checkpoint), which specifically stores the Devices information PodDeviceEntries assigned by each Pod, as well as the registered Resource Name and the corresponding Devices IDs.
Type checkpointData struct {PodDeviceEntries [] podDevicesCheckpointEntry RegisteredDevices map [string] [] string / / key is ResourceName, value is DeviceIDs} type podDevicesCheckpointEntry struct {PodUID string ContainerName string ResourceName string DeviceIDs [] string AllocResp [] byte}
PluginOpts: map object. Key is Resource Name,value and DevicePluginOptions. Currently, there is only one item, PreStartRequired bool, indicating whether to call the PreStartContiner API of device plugin before the container starts. In nvidia-k8s-plugin, PreStartContainer is an empty implementation.
NewManagerImpl
Let's take a look at the specific creation of Device Manager to implement NewManagerImpl.
Pkg/kubelet/cm/devicemanager/manager.go:97// NewManagerImpl creates a new manager.func NewManagerImpl () (* ManagerImpl, error) {/ / interact with device plugin via / var/lib/kubelet/device-plugins/kubelet.sock return newManagerImpl (pluginapi.KubeletSocket)} func newManagerImpl (socketPath string) (* ManagerImpl, error) {glog.V (2) .Infof ("Creating Device Plugin manager at% s" SocketPath) if socketPath = "" | |! filepath.IsAbs (socketPath) {return nil, fmt.Errorf (errBadSocket+ "% v", socketPath)} dir, file: = filepath.Split (socketPath) manager: = & ManagerImpl {endpoints: make (map [string] endpoint), socketname: file, socketdir: dir HealthyDevices: make (map [string] sets.String), unhealthyDevices: make (map [string] sets.String), allocatedDevices: make (map [string] sets.String), pluginOpts: make (map [string] * pluginapi.DevicePluginOptions), podDevices: make (podDevices) } manager.callback = manager.genericDeviceUpdateCallback / / The following structs are populated with real implementations in manager.Start () / / Before that, initializes them to perform no-op operations. Manager.activePods = func () [] * v1.Pod {return [] * v1.Pod {}} manager.sourcesReady = & sourcesReadyStub {} var err error / / create a file store type key-value storage file kubelet_internal_checkpoint in the / var/lib/kubelet/device-plugins/ directory to be used as the checkpoint of kubelet's device plugin. Manager.store, err = utilstore.NewFileStore (dir, utilfs.DefaultFs {}) if err! = nil {return nil, fmt.Errorf ("failed to initialize device plugin checkpointing store:% + v", err)} return manager, nil}
Kubelet Device Manager interacts with device plugin through / var/lib/kubelet/device-plugins/kubelet.sock.
Register callback as genericDeviceUpdateCallback, which is used to handle the add,delete,update event of the corresponding devices.
Create a file store type key-value storage file, kubelet_internal_checkpoint, under the / var/lib/kubelet/device-plugins/ directory, which is used as the checkpoint of kubelet's device plugin.
When a devices add/delete/update event is heard, it is updated to the kubelet_internal_checkpoint file.
When the stop time of device plugin exceeds grace period time (the code is written as 5min and is not configurable), the corresponding devices will be deleted from checkpoint. During this time frame, Device Manager continues to cache the endpoint and the corresponding devices.
When Container Allocate Devices, PodDevices is also updated to checkpoint.
Let's take a look at the implementation of callback, the implementation of genericDeviceUpdateCallback, and see how Device Manager handles devices's add/delete/update messages.
Pkg/kubelet/cm/devicemanager/manager.go:134func (m * ManagerImpl) genericDeviceUpdateCallback (resourceName string, added, updated, deleted [] pluginapi.Device) {kept: = append (updated, added...) M.mutex.Lock () if _, ok: = m.healthyDevices [resourceName];! ok {m.healthyDevices [resourceName] = sets.NewString ()} if _, ok: = m.unhealthyDevices [resourceName] ! ok {m.unhealthyDevices [resourceName] = sets.NewString ()} for _ Dev: = range kept {if dev.Health = = pluginapi.Healthy {m.healthyDevices [resourceName] .Insert (dev.ID) m.unhealthyDevices [resourceName] .Delete (dev.ID)} else {m.unhealthyDevices [resourceName] .insert (dev.ID) M. HealthyDevices [resourceName] .Delete (dev.ID)}} for _ Dev: = range deleted {m. HealthyDevices [resourceName] .Delete (dev.ID) m.unhealthyDevices [resourceName] .Delete (dev.ID)} m.mutex.Unlock () m.writeCheckpoint ()}
If the devices status received in the callback is Healthy, the device ID is inserted into the healthDevices in the ManagerImpl and removed from the unhealthyDevices.
If the devices status received in the callback is Unhealthy, the device ID is inserted into the unhealthDevices in the ManagerImpl and removed from the healthyDevices.
Remove the devices that requires delete for device plugin feedback from both healthDevices and unhealthDevices.
Update the data in ManagerImpl to the checkpoint file.
Startup of Device Manager
The previous analysis of the creation process of Device Manager also involves the analysis of checkpoint and callback. Next, we continue to analyze the Start process of Device Manager.
Start Device Manager
Device Manager is started during the Start of containerManagerImpl.
Pkg/kubelet/cm/container_manager_linux.go:527func (cm * containerManagerImpl) Start (node * v1.Node, activePods ActivePodsFunc, sourcesReady config.SourcesReady, podStatusProvider status.PodStatusProvider, runtimeService internalapi.RuntimeService) error {/ / Starts device manager. If err: = cm.deviceManager.Start (devicemanager.ActivePodsFunc (activePods), sourcesReady); err! = nil {return err} return nil}
The first argument to deviceManager.Start is the function that gets the active (non-terminated) Pods of the node.
SourcesReady is a Pod Sources used to track kubelet configurations, and these Sources include:
File: create a static Pods through static file.
Http: obtain Pods information through the http interface.
Api: getting Pods information from Kubernetes API Server is the default internal mechanism of Kubernetes.
*: indicates that all the above Sources types are included.
ManagerIml Start
ManagerIml.Start is responsible for starting Device Manager and providing gRPC services.
Pkg/kubelet/cm/devicemanager/manager.go:204// Start starts the Device Plugin Manager amd start initialization of// podDevices and allocatedDevices information from checkpoint-ed state and// starts device plugin registration service.func (m * ManagerImpl) Start (activePods ActivePodsFunc, sourcesReady config.SourcesReady) error {m.activePods = activePods m.sourcesReady = sourcesReady / / Loads in allocatedDevices information from disk. Err: = m.readCheckpoint ()... SocketPath: = filepath.Join (m.socketdir, m.socketname) os.MkdirAll (m.socketdir, 0755) / / Removes all stale sockets in m.socketdir. Device plugins can monitor / / this and use it as a signal to re-register with the new Kubelet. If err: = m.removeContents (m.socketdir) Err! = nil {glog.Errorf ("Fail to clean up stale contents under% s:% + v", m.socketdir, err)} s, err: = net.Listen ("unix", socketPath) if err! = nil {glog.Errorf (errListenSocket+ "% + v", err) return err} m.server = grpc.NewServer ([] grpc.ServerOption {}...) Pluginapi.RegisterRegistrationServer (m.server, m) go m.server.Serve (s) glog.V (2). Infof ("Serving device plugin registration server on% Q", socketPath) return nil}
First of all, read the data in checkpoint file and recover the relevant data of ManagerImpl, including:
PodDevices
AllocatedDevices
HealthyDevices
UnhealthyDevices
Endpoints, note that the stop time of endpoint is set to the current time, which means that after kubelet restart, you need to wait for device plugin to re-register before these resource are available.
Then empty all the files under / var/lib/kubelet/device-plugins/, except the checkpiont file, that is, all the socket files, including your own kubelet.sock, as well as all other previous device plugin socket files. Device plugin monitors whether the kubelet.sock file is deleted, and if it is deleted, it triggers its own re-registration with kubelet.
Create kubelet.sock and start gRPC Server to provide gRPC service. Currently, only Register service is registered, which is used for plug-in registration for Device plugin calls.
Register service
Let's take a look at Register, the only gRPC interface provided by kubelet Device Manager.
Registerpkg/kubelet/cm/devicemanager/manager.go:289// Register registers a device plugin.func (m * ManagerImpl) Register (ctx context.Context, r * pluginapi.RegisterRequest) (* pluginapi.Empty, error) {glog.Infof ("Got registration request from device plugin with resource name% Q", r.ResourceName) metrics.DevicePluginRegistrationCount.WithLabelValues (r.ResourceName) .Inc () var versionCompatible bool for _ V: = range pluginapi.SupportedVersions {if r.Version = = v {versionCompatible = true break}} if! versionCompatible {errorString: = fmt.Sprintf (errUnsupportedVersion, r.Version, pluginapi.SupportedVersions) glog.Infof ("Bad registration request from device plugin with resource name% Q:% v", r.ResourceName ErrorString) return & pluginapi.Empty {}, fmt.Errorf (errorString)} if! v1helper.IsExtendedResourceName (v1.ResourceName (r.ResourceName)) {errorString: = fmt.Sprintf (errInvalidResourceName, r.ResourceName) glog.Infof ("Bad registration request from device plugin:% v", errorString) return & pluginapi.Empty {} Fmt.Errorf (errorString)} / / TODO: for now, always accepts newest device plugin. Later may consider to / / add some policies here, e.g., verify whether an old device plugin with the / / same resource name is still alive to determine whether we want to accept / / the new registration. Go m.addEndpoint (r) return & pluginapi.Empty {}, nil}
The registration request is sent by device plugin to kubelet. The registration request RegisterRequest is:
Device plugin api version for type RegisterRequest struct {Version string / / Kubernetes 1.10 is socket name ResourceName string Options * DevicePluginOptions} for v1beta1 Endpoint string / / device plugin.
The registered Resource Name is checked to see if it conforms to the rules of Extended Resource:
Resource Name cannot belong to kubernetes.io, but must have its own domain, such as nvidia.com.
Resource Name cannot contain requests. Prefix.
The corresponding Resource value can only be an integer value.
Call addEndpoint to register the plug-in.
AddEndpoint registers for device plugin
As you can see from the Register method above, the logic of the real plug-in registration is implemented in addEndpoint.
Pkg/kubelet/cm/devicemanager/manager.go:332func (m * ManagerImpl) addEndpoint (r * pluginapi.RegisterRequest) {existingDevs: = make (map [string] pluginapi.Device) m.mutex.Lock () old, ok: = m.endpoints [r.ResourceName] if ok & & old! = nil {/ / Pass devices of previous endpoint into re-registered one / / to avoid potential orphaned devices upon re-registration devices: = make (map [string] pluginapi.Device) for _, device: = range old.getDevices () {devices [device.ID] = device} existingDevs = devices} m.mutex.Unlock () socketPath: = filepath.Join (m.socketdir R.Endpoint) e, err: = newEndpointImpl (socketPath, r.ResourceName, existingDevs, m.callback) if err! = nil {glog.Errorf ("Failed to dial device plugin with request% v:% v", r Err) return} m.mutex.Lock () if r.Options! = nil {m.pluginOpts [r.ResourceName] = r.Options} / / Check for potential re-registration during the initialization of new endpoint, / / and skip updating if re-registration happens. / / TODO: simplify the part once we have a better way to handle registered devices ext: = m.endpoints [r.ResourceName] if ext! = old {glog.Warningf ("Some other endpoint% v is added while endpoint% v is initialized", ext, e) m.mutex.Unlock () e.stop () return} / / Associates the newly created endpoint with the corresponding resource name. / / Stops existing endpoint if there is any. M.endpoints [r.ResourceName] = e glog.V (2). Infof ("Registered endpoint% v", e) m.mutex.Unlock () if old! = nil {old.stop ()} go func () {e.run () e.stop () m.mutex.Lock () if old Ok: = m.endpoints [r.ResourceName] Ok & & old = e {m.markResourceUnhealthy (r.ResourceName)} glog.V (2). Infof ("Unregistered endpoint% v", e) m.mutex.Unlock ()} ()}
First check whether the registered device plugin has been registered, and if so, get the cached devices.
Then check whether the socket of device plugin can dial successfully. If dial fails, it means that device plugin is not started properly. If the dial is successful, the definition of reinitializing the Endpoint,EndpointImpl based on the cached devices is as follows:
Type endpointImpl struct {client pluginapi.DevicePluginClient clientConn * grpc.ClientConn socketPath string resourceName string stopTime time.Time devices map [string] pluginapi.Device mutex sync.Mutex cb monitorCallback}
To prevent device plugin from re-register during EndpointImpl reinitialization, get the Endpoint of the device plugin in the cache again after initialization is completed, and compare it with the Endpoint object before initialization:
If it is not the same object, re-register occurs during initialization, then close the gRPC connection on the stop interface of invoke Endpoint, and set the stopTime of Endpoint as the current time, and the Register process ends in failure.
Otherwise, continue the rest of the process.
If the device plugin has been registered before, then before calling Endpoint's run () to start, call Endpoint's stop to close the gRPC connection and set the Endpoint's stopTime to the current time.
Then start the golang protocol to execute Endpoint's run (), in the run method:
Call the ListAndWatch gRPC API of device plugin to continuously obtain ListAndWatch gRPC stream through persistent connection
The devices obtained from the stream stream is compared with the cached devices in Endpoint to get the devices that requires add/delete/update.
Then call the callback of Endpoint (that is, the callback method genericDeviceUpdateCallback registered by ManagerImpl) to update the cache of Device Manager and write to the checkpoint file.
Until an errListAndWatch error occurs in the gRPC connection to device plugin, jump out of the endless cycle of continuously getting stream, then call the stop of Endpoint to close the gRPC connection and set the stopTime of Endpoint to the current time.
After invoke stop, mark all the devices corresponding to the device plugin as unhealthy, that is, set healthyDevices to empty, and all the devices of the original healthy are added to the unhealthyDevices, which means registration failed.
Call the Allocate API of Device Plugin to register UpdatePluginResources as Pod Admit Handler
Kubelet registers a series of Pod Admit Handler in NewMainKubelet. When a Pod needs to be created, these Pod Admit Handler will be called first for processing. Klet.containerManager.UpdatePluginResources is the devices assigned by kubelet Device Manager to the Pod.
Pkg/kubelet/kubelet.go:893func NewMainKubelet (...) (* Kubelet, error) {... Klet.admitHandlers.AddPodAdmitHandler (lifecycle.NewPredicateAdmitHandler (klet.getNodeAnyWay, criticalPodAdmissionHandler, klet.containerManager.UpdatePluginResources)...} pkg/kubelet/cm/container_manager_linux.go:618func (cm * containerManagerImpl) UpdatePluginResources (node * schedulercache.NodeInfo, attrs * lifecycle.PodAdmitAttributes) error {return cm.deviceManager.Allocate (node, attrs)} Allocate
Before creating a Pod, kubelet will invoke Device Manager the Allocate method. Assigning a corresponding devices,kubelet to each Container request in the Pod will forward the request to the Allocate method of the corresponding Endpoint, and then the request will be processed to the corresponding device plugin.
Pkg/kubelet/cm/devicemanager/manager.go:259func (m * ManagerImpl) Allocate (node * schedulercache.NodeInfo, attrs * lifecycle.PodAdmitAttributes) error {pod: = attrs.Pod devicesToReuse: = make (map [string] sets.String) / / TODO: Reuse devices between init containers and regular containers. For _, container: = range pod.Spec.InitContainers {if err: = m.allocateContainerResources (pod, & container, devicesToReuse) Err! = nil {return err} m.podDevices.addContainerAllocatedResources (string (pod.UID), container.Name, devicesToReuse)} for _, container: = range pod.Spec.Containers {if err: = m.allocateContainerResources (pod, & container, devicesToReuse) Err! = nil {return err} m.podDevices.removeContainerAllocatedResources (string (pod.UID), container.Name, devicesToReuse)} m.mutex.Lock () defer m.mutex.Unlock () / / quick return if no pluginResources requested if _, podRequireDevicePluginResource: = m.podDevices [string (pod.UID)] ! podRequireDevicePluginResource {return nil} m.sanitizeNodeAllocatable (node) return nil}
Call allocateContainerResources to assign devices to init container in Pod and update PodDevices cache in ManagerImpl
Call allocateContainerResources to assign devices to regular container in Pod and update PodDevices cache in ManagerImpl
Call sanitizeNodeAllocatable to update the Allocatable Resource of Node corresponding to Resource Name in scheduler cache
AllocateContainerResourcespkg/kubelet/cm/devicemanager/manager.go:608func (m * ManagerImpl) allocateContainerResources (pod * v1.Pod, container * v1.Container, devicesToReuse map [string] sets.String) error {podUID: = string (pod.UID) contName: = container.Name allocatedDevicesUpdated: = false / / Extended resources are not allowed to be overcommitted. / / Since device plugin advertises extended resources, / / therefore Requests must be equal to Limits and iterating / / over the Limits should be sufficient. For k, v: = range container.Resources.Limits {resource: = string (k) needed: = int (v.Value ()) glog.V (3). Infof ("needs% d% s", needed Resource) if! m.isDevicePluginResource (resource) {continue} / Updates allocatedDevices to garbage collect any stranded resources / / before doing the device plugin allocation. If! allocatedDevicesUpdated {m.updateAllocatedDevices (m.activePods ()) allocatedDevicesUpdated = true} allocDevices, err: = m.devicesToAllocate (podUID, contName, resource, needed DevicesToReuse [resource]) if err! = nil {return err} if allocDevices = = nil | | len (allocDevices)
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.