In addition to Weibo, there is also WeChat
Please pay attention
WeChat public account
Shulou
2025-01-16 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Internet Technology >
Share
Shulou(Shulou.com)05/31 Report--
This article mainly explains "how to achieve rpc function in Etay Square". Interested friends may wish to have a look. The method introduced in this paper is simple, fast and practical. Next, let the editor take you to learn "how to achieve rpc function in Ethernet Square"!
The principle and realization of Etay Square-rpc
JSON-RPC is standard for external calls to the blockchain. Yi Tai Fong also realized this function. The underlying layer supports four protocols: InProc,IPC,HTTP,WEBSOCKED. The upper layer also implements the Pub/Sub function in addition to regular method calls.
Api release
Api interfaces are distributed in each module and are mainly divided into two types.
1: several service (admin,web3j,debug etc) in direct code and Node
2: the service structure of the Service interface is implemented, and the registered service calls the APIs () method to get the api.
/ / file go-ethereum/node/node.gofunc (n * Node) startRPC (services map [reflection Type] Service) error {apis: = n.apis () for _, service: = range services {apis = append (apis, service.APIs ()...)}}
The interface written dead in node
/ / Interface func (n * Node) apis () [] rpc.API {return [] rpc.API {{Namespace: "admin", Version: "1.0", Service: NewPrivateAdminAPI (n),}, {Namespace:" admin " Version: "1.0", Service: NewPublicAdminAPI (n), Public: true,}, {Namespace:" debug ", Version:" 1.0", Service: debug.Handler,}, {Namespace: "debug" Version: "1.0", Service: NewPublicDebugAPI (n), Public: true,}, {Namespace:" web3 ", Version:" 1.0", Service: NewPublicWeb3API (n), Public: true,} }}
The APIs () interface of the Ethereum service implementation is similar to other services (dashboard,ethstats)
/ / APIs () interface of Ethereum service implementation func (s * Ethereum) APIs () [] rpc.API {apis: = ethapi.GetAPIs (s.ApiBackend) / / Append any APIs exposed explicitly by the consensus engine apis = append (apis, s.engine.APIs (s.BlockChain ())...) / / Append all the local APIs and return return append (apis [] rpc.API {{Namespace: "eth", Version: "1.0", Service: NewPublicEthereumAPI (s), Public: true,}, {Namespace:" eth ", Version:" 1.0" Service: NewPublicMinerAPI (s), Public: true,}, {Namespace: "eth", Version: "1.0", Service: downloader.NewPublicDownloaderAPI (s.protocolManager.downloader, s.eventMux), Public: true,} {Namespace: "miner", Version: "1.0", Service: NewPrivateMinerAPI (s), Public: false,}, {Namespace:" eth ", Version:" 1.0", Service: filters.NewPublicFilterAPI (s.ApiBackend, false) Public: true,}, {Namespace: "admin", Version: "1.0", Service: NewPrivateAdminAPI (s),}, {Namespace:" debug ", Version:" 1.0", Service: NewPublicDebugAPI (s) Public: true,}, {Namespace: "debug", Version: "1.0", Service: NewPrivateDebugAPI (s.chainConfig, s),}, {Namespace:" net ", Version:" 1.0" Service: s.netRPCService, Public: true,},}...)}
The Service here is just a type, and it needs to be registered in Server. The principle is to reflect the type in the structure and parse the function method name (lowercase), parameter name, return type and other information. Finally, each qualified method will generate a service instance.
Type service struct {name string / / name for service typ reflect.Type / / receiver type callbacks callbacks / / registered handlers subscriptions subscriptions/ / available subscriptions/notifications} / / reflect the structural method of dividing Service Api / / file go-ethereum/rpc/utils.go func suitableCallbacks (rcvr reflect.Value, typ reflect.Type) (callbacks Subscriptions) {callbacks: = make (callbacks) subscriptions: = make (subscriptions) METHODS: for m: = 0 M
< typ.NumMethod(); m++ { method := typ.Method(m) mtype := method.Type //转小写 mname := formatName(method.Name) if method.PkgPath != "" { // method must be exported continue } var h callback //订阅事件类型判断 主要根据签名的入参第二位和返回参数第一位 h.isSubscribe = isPubSub(mtype) h.rcvr = rcvr h.method = method h.errPos = -1 firstArg := 1 numIn := mtype.NumIn() if numIn >= 2 & & mtype.In (1) = = contextType {h.hasCtx = true firstArg = 2} if h.isSubscribe {/ / subscription type h.argTypes = make ([] reflect.Type, numIn-firstArg) / / skip rcvr type for i: = firstArg; I
< numIn; i++ { argType := mtype.In(i) if isExportedOrBuiltinType(argType) { h.argTypes[i-firstArg] = argType } else { continue METHODS } } subscriptions[mname] = &h continue METHODS } // determine method arguments, ignore first arg since it's the receiver type // Arguments must be exported or builtin types h.argTypes = make([]reflect.Type, numIn-firstArg) for i := firstArg; i < numIn; i++ { argType := mtype.In(i) if !isExportedOrBuiltinType(argType) { continue METHODS } h.argTypes[i-firstArg] = argType } // check that all returned values are exported or builtin types for i := 0; i < mtype.NumOut(); i++ { if !isExportedOrBuiltinType(mtype.Out(i)) { continue METHODS } } // when a method returns an error it must be the last returned value h.errPos = -1 for i := 0; i < mtype.NumOut(); i++ { if isErrorType(mtype.Out(i)) { h.errPos = i break } } if h.errPos >= 0 & & h.errPos! = mtype.NumOut ()-1 {continue METHODS} switch mtype.NumOut () {case 0,1 2: if mtype.NumOut () = = 2 & & h.errPos =-1 {/ / method must one return value and 1 error continue METHODS} callbacks [mname] = & h}} return callbacks, subscriptions} underlying protocol
The underlying layer supports four InProc,IPC,HTTP,WEBSOCKED transport protocols.
1 InProc directly generates a RPCService instance, which can be called directly when hung on the Node.
2 IPC listens to the pipeline, parses it into a ServerCodec object after receiving the message, and throws it to the ServeCodec method of Server to use
/ / file ipc.go func (srv * Server) ServeListener (l net.Listener) error {for {conn, err: = l.Accept () if netutil.IsTemporaryError (err) {log.Warn ("RPC accept error", "err" Err) continue} else if err! = nil {return err} log.Trace ("Accepted connection", "addr", conn.RemoteAddr ()) go srv.ServeCodec (NewJSONCodec (conn), OptionMethodInvocation | OptionSubscriptions)}}
3 HTTP generates two middleware, and the second middleware receives messages to generate ServerCOdec, which is thrown to the ServeSingleRequest method of Server.
/ / file http.go func (srv * Server) ServeHTTP (w http.ResponseWriter, r * http.Request) {/ / Permit dumb empty requests for remote health-checks (AWS) if r.Method = = http.MethodGet & & r.ContentLength = = 0 & & r.URL.RawQuery = "" {return} if code, err: = validateRequest (r) Err! = nil {http.Error (w, err.Error (), code) return} / / All checks passed, create a codec that reads direct from the request body / / untilEOF and writes the response to w and order the server to process a / / single request. Ctx: = context.Background () ctx = context.WithValue (ctx, "remote", r.RemoteAddr) ctx = context.WithValue (ctx, "scheme", r.Proto) ctx = context.WithValue (ctx, "local", r.Host) body: = io.LimitReader (r.Body, maxRequestContentLength) codec: = NewJSONCodec (& httpReadWriteNopCloser {body W}) defer codec.Close () w.Header (). Set ("content-type", contentType) srv.ServeSingleRequest (codec, OptionMethodInvocation, ctx)}
1 WEBSOCKED and Http types generate WebsocketHandler middleware, which is parsed into a ServerCodec object after the message, and is thrown to the ServeCodec method of Server to use.
/ / websocked.go func (srv * Server) WebsocketHandler (allowedOrigins [] string) http.Handler {return websocket.Server {Handshake: wsHandshakeValidator (allowedOrigins) Handler: func (conn * websocket.Conn) {/ / Create a custom encode/decode pair to enforce payload size and number encoding conn.MaxPayloadBytes = maxRequestContentLength encoder: = func (v interface {}) error {return websocketJSONCodec.Send (conn) V)} decoder: = func (v interface {}) error {return websocketJSONCodec.Receive (conn, v)} srv.ServeCodec (NewCodec (conn, encoder, decoder), OptionMethodInvocation | OptionSubscriptions)},}} rpc response
After the above four protocols get the ServerCodec object, the object will be passed to the response of service. Please count it. In the end, it is called into the handle function, and then the handle responds according to different types.
Func (s * Server) handle (ctx context.Context, codec ServerCodec, req * serverRequest) (interface {}, func ()) {if req.err! = nil {return codec.CreateErrorResponse (& req.id, req.err) Nil} if req.isUnsubscribe {/ / unsubscribe function if len (req.args) > = 1 & & req.args [0] .Kind () = reflect.String {notifier Supported: = NotifierFromContext (ctx) / / get notifier object if! supported {/ / interface doesn't support subscriptions (e.g. Http) return codec.CreateErrorResponse (& req.id, & callbackError {ErrNotificationsUnsupported.Error ()}) Nil} / / Unsubscribe subid: = ID (req.args [0] .string ()) if err: = notifier.unsubscribe (subid) Err! = nil {return codec.CreateErrorResponse (& req.id, & callbackError {err.Error ()}), nil} return codec.CreateResponse (req.id, true), nil} return codec.CreateErrorResponse (& req.id, & invalidParamsError {"Expected subscription id as first argument"}) Nil} if req.callb.isSubscribe {/ / subscription function subid, err: = s.createSubscription (ctx, codec, req) if err! = nil {return codec.CreateErrorResponse (& req.id, & callbackError {err.Error ()}) Nil} / / active the subscription after the subid was successfully sent to the client activateSub: = func () {notifier, _: = NotifierFromContext (ctx) / / get notifier object notifier.activate (subid, req.svcname) / / subscription event} return codec.CreateResponse (req.id, subid) ActivateSub} / / regular RPC call, prepare arguments / / parameters generate if len (req.args)! = len (req.callb.argTypes) {rpcErr: = & invalidParamsError {fmt.Sprintf ("% s%s%s expects% d parameters, got% d", req.svcname, serviceMethodSeparator, req.callb.method.Name, len (req.callb.argTypes)) Len (req.args)} return codec.CreateErrorResponse (& req.id, rpcErr), nil} arguments: = [] reflect.Value {req.callb.rcvr} if req.callb.hasCtx {arguments = append (arguments, reflect.ValueOf (ctx))} if len (req.args) > 0 {arguments = append (arguments) Req.args...)} / / execute RPC method and return result / / execute the corresponding function reply: = req.callb.method.Func.Call (arguments) if len (reply) = 0 {return codec.CreateResponse (req.id, nil) Nil} / / check result if req.callb.errPos > = 0 {/ / test if method returned an error if! reply [req.callb.errPos] .IsNil () {e: = reply [req.callb.errPos] .Interface (). (error) res: = codec.CreateErrorResponse (& req.id, & callbackError {e.Error ()}) return res Nil}} return codec.CreateResponse (req.id, reply [0] .Interface ()), nil}
Pub/sub implementation
The underlying layer binds a notifier object in context
If options&OptionSubscriptions = = OptionSubscriptions {ctx = context.WithValue (ctx, notifierKey {}, newNotifier (codec))}
When you sub/unsub, you will get the notifier object from context.Value and call the method above to register or cancel the registration.
Func NotifierFromContext (ctx context.Context) (* Notifier, bool) {n, ok: = ctx.Value (notifierKey {}). (* Notifier) return n, ok}
Register
Func (n * Notifier) activate (id ID, namespace string) {n.subMu.Lock () defer n.subMu.Unlock () if sub, found: = n.inactive [id]; found {sub.namespace = namespace n.active [id] = sub delete (n.inactive, id)}}
Write off
Func (n * Notifier) unsubscribe (id ID) error {n.subMu.Lock () defer n.subMu.Unlock () if s, found: = n.active [id]; found {close (s.err) delete (n.active, id) return nil} return ErrSubscriptionNotFound}
Message event trigger
Func (api * PrivateAdminAPI) PeerEvents (ctx context.Context) (* rpc.Subscription, error) {/ / Make sure the server is running, fail otherwise server: = api.node.Server () if server = = nil {return nil, ErrNodeStopped} / / Create the subscription / / get notifier object notifier, supported: = rpc.NotifierFromContext (ctx) if! supported {return nil Rpc.ErrNotificationsUnsupported} / / generate identity rpcSub: = notifier.CreateSubscription () go func () {events: = make (chan * p2p.PeerEvent) sub: = server.SubscribeEvents (events) defer sub.Unsubscribe () for {select {case event: =
Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.
Views: 0
*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.
Continue with the installation of the previous hadoop.First, install zookooper1. Decompress zookoope
"Every 5-10 years, there's a rare product, a really special, very unusual product that's the most un
© 2024 shulou.com SLNews company. All rights reserved.