Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

The concrete realization of adding, deleting, modifying and querying etcd in kubernetes

2025-02-24 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Servers >

Share

Shulou(Shulou.com)06/01 Report--

The main content of this article is to explain "the specific implementation of the addition, deletion, modification and search of etcd in kubernetes", interested friends may wish to take a look. The method introduced in this paper is simple, fast and practical. Next let the editor to take you to learn "the specific implementation of the addition, deletion, modification and search of etcd in kubernetes"!

Centralized data storage is realized based on etcd in kubernetes. Today, we will learn how to realize data read consistency, update consistency and transaction based on etcd.

1. Conversion between data storage and version 1.1 data storage

In K8s, some data storage needs to be processed before it can be stored. For example, encrypted data like secret contains at least two operations: encrypted storage, decryption and reading. Transformer is implemented to complete this operation. It encrypts the data when it stores etcd data, and decrypts it when it is read.

1.2 Resource version revision

When you modify (add, delete) in etcd, the revision is incremented, and this value is also used as the ResourceVersion of the k8s resource in K8s. This mechanism is also the key mechanism to implement watch. When you operate etcd to decode the data obtained from etcd, you will dynamically modify the value for the resource through the versioner component.

1.3 Mapping of data models

After reading the data from etcd, the data itself is a byte array, how to convert the corresponding data into our real run-time objects? Remember our previous scheme and Codec, where we know the corresponding data encoding format and the type of resource object, then we can complete the reflection of the corresponding data through codec, byte array and target type.

two。 Query interface consistency

Data writing in etcd is based on leader single point writing and cluster quorum mechanism, and it is not a strongly consistent data writing. If the node we visit does not exist in half of the nodes in quorum, it may cause temporary data inconsistency. For some strongly consistent scenarios, we can read the data through its revision mechanism to ensure that we read the updated data.

/ / omit the non-core code func (s * store) Get (ctx context.Context, key string, resourceVersion string, out runtime.Object, ignoreNotFound bool) error {/ / get key getResp, err: = s.client.KV.Get (ctx, key, s.getOps.) / / check whether the current version reaches the minimum version of if err = s.ensureMinimumResourceVersion (resourceVersion, uint64 (getResp.Header.Revision)) Err! = nil {return err} / / perform data conversion data, _, err: = s.transformer.TransformFromStorage (kv.Value, authenticatedDataString (key)) if err! = nil {return storage.NewInternalError (err.Error ())} / / decode data return decode (s.codec, s.versioner, data, out, kv.ModRevision)} 3. Create an interface implementation

Creating an interface data will first check the resource object to avoid repeated creation of the object. At this time, it will first make a preliminary check through the version field of the resource object, and then make use of the transaction mechanism of etcd to ensure the atomic operation of resource creation.

/ omit the non-core code func (s * store) Create (ctx context.Context, key string, obj, out runtime.Object, ttl uint64) error {if version, err: = s.versioner.ObjectResourceVersion (obj); err = = nil & & version! = 0 {return errors.New ("resourceVersion should not be set on objects to be created")} if err: = s.versioner.PrepareObjectForStorage (obj) Err! = nil {return fmt.Errorf ("PrepareObjectForStorage failed:% v", err)} / / encode data data, err: = runtime.Encode (s.codec, obj) if err! = nil {return err} / / convert data newData, err: = s.transformer.TransformToStorage (data AuthenticatedDataString (key)) if err! = nil {return storage.NewInternalError (err.Error ())} startTime: = time.Now () / / transaction operation txnResp, err: = s.client.KV.Txn (ctx). If (notFound (key), / / if there is no ModRevision of etcd used here, the modified version is 0 It means that the corresponding key does not exist. Then (clientv3.OpPut (key, string (newData), opts...), / / put modify data). Commit () metrics.RecordEtcdRequestLatency ("create", getTypeName (obj), startTime) if err! = nil {return err} if! txnResp.Succeeded {return storage.NewKeyExistsError (key) 0)} if out! = nil {/ / get the corresponding Revision putResp: = txnResp.Responses [0] .GetResponsePut () return decode (s.codec, s.versioner, data, out, putResp.Header.Revision)} return nil} func notFound (key string) clientv3.Cmp {return clientv3.Compare (clientv3.ModRevision (key), "=", 0)} 4. Delete the implementation of the interface

Delete interface is mainly implemented through CAS and transaction mechanism to ensure that no exception occurs in etcd, even if the same resource is deleted concurrently, at least one node is successful.

/ / omit non-core code func (s * store) conditionalDelete (ctx context.Context, key string, out runtime.Object, v reflect.Value, preconditions * storage.Preconditions, validateDeletion storage.ValidateObjectFunc) error {startTime: = time.Now () / / get the data getResp of the current key, err: = s.client.KV.Get (ctx, key) for {/ / get the current status origState Err: = s.getState (getResp, key, v, false) if err! = nil {return err} txnResp, err: = s.client.KV.Txn (ctx). If (clientv3.Compare (clientv3.ModRevision (key), "=", origState.rev), / / if the modified version equals the current state Just try to delete) .Then (clientv3.OpDelete (key), / / delete) .Else (clientv3.OpGet (key)) / / get) .Commit () if! txnResp.Succeeded {/ / get the latest data retry transaction operation getResp = (* clientv3.GetResponse) (txnResp.Responses [0] .GetResponseRange () klog.V (4) .Infof ("deletion of% s failed because of a conflict, going to retry" Key) continue} / / decode the last version of the data into out Then return return decode (s.codec, s.versioner, origState.data, out, origState.rev)}} 5. Update the implementation of the interface

There is no essential difference between updating the interface and deleting the interface, but if multiple nodes update at the same time, one of the CAS concurrent operations is bound to be successful. When it is found that the operation of the node has been successful, then the current node does not need to do too much operation, just return directly.

/ / omit the non-core code func (s * store) GuaranteedUpdate (ctx context.Context, key string, out runtime.Object, ignoreNotFound bool, preconditions * storage.Preconditions, tryUpdate storage.UpdateFunc, suggestion... runtime.Object) error {/ / get the latest data of the current key getCurrentState: = func () (* objState, error) {startTime: = time.Now () getResp Err: = s.client.KV.Get (ctx, key, s.getOps...) Metrics.RecordEtcdRequestLatency ("get", getTypeName (out), startTime) if err! = nil {return nil, err} return s.getState (getResp, key, v IgnoreNotFound)} / / get current data var origState * objState var mustCheckData bool if len (suggestion) = = 1 & & suggestion [0]! = nil {/ / if recommended data is provided Will use the OrigState, err = s.getStateFromObject (suggestion [0]) if err! = nil {return err} / / but need to detect the data mustCheckData = true} else {/ / try to get the data origState again Err = getCurrentState () if err! = nil {return err}} transformContext: = authenticatedDataString (key) for {/ / check whether the object has been updated, mainly by detecting uuid/revision to achieve if err: = preconditions.Check (key, origState.obj) Err! = nil {/ / If our data is already up to date, return the error if! mustCheckData {return err} / / if checking data consistency error You need to get the origState again. Err = getCurrentState () if err! = nil {return err} mustCheckData = false / / Retry continue} / / Delete the current Version data revision ret of Ttl, err: = s.updateState (origState, tryUpdate) if err! = nil {/ / If our data is already up to date Return the error if! mustCheckData {return err} / / It's possible we were working with stale data / / Actually fetch origState Err = getCurrentState () if err! = nil {return err} mustCheckData = false / / Retry continue} / / Encoding data data Err: = runtime.Encode (s.codec, ret) if err! = nil {return err} if! origState.stale & & bytes.Equal (data, origState.data) {/ / if we find that our current data is consistent with the data obtained If mustCheckData {origState will be skipped directly. Err = getCurrentState () if err! = nil {return err} mustCheckData = false if! bytes.Equal (data OrigState.data) {/ / original data changed Restart loop continue}} if! origState.stale {/ / directly returns data return decode (s.codec, s.versioner, origState.data, out) OrigState.rev)}} / / Chinese data newData, err: = s.transformer.TransformToStorage (data, transformContext) if err! = nil {return storage.NewInternalError (err.Error ())} opts, err: = s.ttlOpts (ctx Int64 (ttl) if err! = nil {return err} trace.Step ("Transaction prepared") startTime: = time.Now () / / transaction update data txnResp Err: = s.client.KV.Txn (ctx). If (clientv3.Compare (clientv3.ModRevision (key), "=", origState.rev),) .Then (clientv3.OpPut (key, string (newData), opts...),) .Else (clientv3.OpGet (key)) ). Commit () metrics.RecordEtcdRequestLatency ("update", getTypeName (out) StartTime) if err! = nil {return err} trace.Step ("Transaction committed") if! txnResp.Succeeded {/ / retrieve data getResp: = (* clientv3.GetResponse) (txnResp.Responses [0]. GetResponseRange () klog.V (4) .Infof ("GuaranteedUpdate of% s failed because of a conflict" Going to retry ", key) origState, err = s.getState (getResp, key, v) IgnoreNotFound) if err! = nil {return err} trace.Step ("Retry value restored") mustCheckData = false continue} / / Get put response putResp: = txnResp.Responses [0] .GetResponsePut () return decode (s.codec S.versioner, data, out, putResp.Header.Revision)}} so far I believe that you have a deeper understanding of "the specific implementation of the addition, deletion, modification and search of etcd in kubernetes", you might as well come to the actual operation! Here is the website, more related content can enter the relevant channels to inquire, follow us, continue to learn!

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Servers

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report