加入收藏 | 设为首页 | 会员中心 | 我要投稿 济南站长网 (https://www.0531zz.com/)- 科技、建站、经验、云计算、5G、大数据,站长网!
当前位置: 首页 > 教程 > 正文

Golang 1.4 net/rpc client源码介绍

发布时间:2021-11-19 14:47:05 所属栏目:教程 来源:互联网
导读:net/rpc是golang标准库提供的rpc框架,下面我们重点看下net/rpc是如何实现的。 我本机源码安装路径在/usr/local/go,这net/rpc(golang 1.4版本)涉及到的相关代码主要有: client.go server.go 首先我们先从client.go,客户端入手看: type ClientCodecinte

net/rpc是golang标准库提供的rpc框架,下面我们重点看下net/rpc是如何实现的。 我本机源码安装路径在/usr/local/go,这net/rpc(golang 1.4版本)涉及到的相关代码主要有:
type ClientCodecinterface{// WriteRequest must be safe for concurrent use by multiple goroutines.     WriteRequest(*Request,interface{}) error                                   
    ReadResponseHeader(*Response) error                                         
    ReadResponseBody(interface{}) error                                         
    Close() error                                                               
type Callstruct{ServiceMethodstring// The name of the service and method to call.    Argsinterface{}// The argument to the function (*struct).           Replyinterface{}// The reply from the function (*struct).            Error         error       // After completion, the error status.               Done          chan *Call// Strobes when call is complete.                    Tracer*Trace// tracer                                            }  
type Clientstruct{                                                               
    codec ClientCodec                                                              
    reqMutex sync.Mutex// protects following                                      
    request  Request                                                               
    mutex    sync.Mutex// protects following                                      
    seq      uint64                                                                
    pending  map[uint64]*Call                                                      
    closing  bool// user has called Close                                         
    shutdown bool// server has told us to stop                                    }
func (client *Client) send(call *Call){// client要想复用,保证线程安全,加上请求锁reqMutex是必须的。                                
    defer client.reqMutex.Unlock()// 这其实是针对map的另外一把锁,这样可以更细粒度的操作                                                    
    client.mutex.Lock()// client如果外部调用关闭,那么call也是结束状态,之后我们再分析call.done()                                                         if client.shutdown || client.closing {                                      
        call.done()return}// 重点来了!seq序号自增在把call请求暂存在pennding的map中,锁释放                                                                  
    seq := client.seq                                                           
    client.pending[seq]= call                                                  
    client.mutex.Unlock()// 这一块代码属于编码请求了,因为rpc涉及到调用具体是谁,所以需要把method传给rpc server// 这里的Seq是用于当server response的时候,seq从client->server,再从server->client,然后反查map,定位call对象使用的。                                         
    client.request.Seq= seq                                                    
    client.request.ServiceMethod= call.ServiceMethod// inject tracer,这个请忽视。。。                                                         
    client.request.Tracer= call.Tracer                                         
    err := client.codec.WriteRequest(&client.request, call.Args)if err !=nil{                                                             
        call = client.pending[seq]delete(client.pending, seq)                                             
        client.mutex.Unlock()if call !=nil{                                                        
            call.Error= err                                                    
// 我们rpc请求的时候,调用就是这个方法,传入方法名,参数,获取返回等
func (client *Client)Call(serviceMethod string, args interface{}, reply interface{}) error {// Call里面调用了client.Go,然后返回一个chan,之后阻塞等待,这是基本的同步调用
    call :=<-client.Go(serviceMethod, args, reply, make(chan *Call,1)).Donereturn call.Error}
func (client *Client)Go(serviceMethod string, args interface{}, reply interface{},done chan *Call)*Call{// 构建call对象
    call :=new(Call)                                                           
    call.ServiceMethod= serviceMethod                                          
    call.Args= args                                                            
    call.Reply= reply   
    // 如果非外部传入call,自己构建                                                       ifdone==nil{done= make(chan *Call,10)// buffered.                                }else{// If caller passes done != nil, it must arrange that                   // done has enough buffer for the number of simultaneous                // RPCs that will be using that channel.  If the channel                // is totally unbuffered, it's best not to run at all.                  if cap(done)==0{                                                     
            log.Panic("rpc: done channel is unbuffered")}}                                                                           
    call.Done=done// 发送请求                                                            
    client.send(call)return call                                                                 
// Dial connects to an RPC server at the specified network address.             
func Dial(network, address string)(*Client, error){                           
    conn, err := net.Dial(network, address)if err !=nil{returnnil, err                                                         
    }returnNewClient(conn),nil}// 我们看到其实NewClient内部使用的默认的gob编码,gobClientCodes实现了Codec的接口                              
func NewClient(conn io.ReadWriteCloser)*Client{                               
    encBuf := bufio.NewWriter(conn)                                             
    client :=&gobClientCodec{conn, gob.NewDecoder(conn), gob.NewEncoder(encBuf), encBuf}returnNewClientWithCodec(client)}// 当然也提供自定义的codec,你可以使用thrift协议、messagepack等来扩展                 // codec to encode requests and decode responses.                               
func NewClientWithCodec(codec ClientCodec)*Client{                            
    client :=&Client{                                                          
        codec:   codec,                                                         
        pending: make(map[uint64]*Call),}                                                                           
    go client.input()return client                                                               
type gobClientCodec struct{                                                    
    rwc    io.ReadWriteCloser                                                   
    dec    *gob.Decoder                                                         
    enc    *gob.Encoder                                                         
    encBuf *bufio.Writer}
func (client *Client) input(){var err error                                                             
    var response Responsefor err ==nil{// 二话不说先获取Response的头                                                    
        response =Response{}                                                   
        err = client.codec.ReadResponseHeader(&response)if err !=nil{break}// 头部中包含了序列号,用于定位pending map使用的                                                               
        seq := response.Seq// 小粒度锁删除map,获取call对象                                                
        call := client.pending[seq]delete(client.pending, seq)                                             
        client.mutex.Unlock()switch{// 如果pending找不到,那么肯定是异常了                                                              case call ==nil:// We've got no pending call. That usually means that               // WriteRequest partially failed, and call was already              // removed; response is a server telling us about an                // error reading request body. We should still attempt              // to read error body, but there's no one to give it to.            
            err = client.codec.ReadResponseBody(nil)if err !=nil{                                                     
                err = errors.New("reading error body: "+ err.Error())}// rpc 报错了,解不开什么的都有可能                                                          case response.Error!="":// We've got an error response. Give this to the request;           // any subsequent requests will get the ReadResponseBody            // error if there is one.                                           
            err = client.codec.ReadResponseBody(nil)if err !=nil{                                                     
                err = errors.New("reading error body: "+ err.Error())}                                                                   
            call.done()default:// 默认还是正常的处理,获取Body给Reply,让调用者可见                                                               
            err = client.codec.ReadResponseBody(call.Reply)if err !=nil{                                                     
                call.Error= errors.New("reading body "+ err.Error())}                                                                   
            call.done()}}// 如果有啥不可逆的异常,那么只能shutdown client了。全部退出吧                                                                         // Terminate pending calls.                                                 
    client.shutdown =true                                                      
    closing := client.closing                                                   
    if err == io.EOF {if closing {                                                            
            err =ErrShutdown}else{                                                                
            err = io.ErrUnexpectedEOF}}// 之前pending的也一个个结束吧,避免调用者都等待                                                                           for _, call := range client.pending {                                       
        call.Error= err                                                        
    client.reqMutex.Unlock()if debugLog && err != io.EOF &&!closing {                                  
        log.Println("rpc: client protocol error:", err)}}
// 把call对象传递给调用者,主要是获取内部的Error
func (call *Call)done(){select{case call.Done<- call:// ok                                                                   default:// We don't want to block here.  It is the caller's responsibility to make// sure the channel has enough buffer space. See comment in Go().       if debugLog {                                                           
            log.Println("rpc: discarding Call reply due to insufficient Done chan capacity")}}}
大致的分析就结束了,但是完整的rpc框架,还应该包括,服务发现,服务降级,服务追踪,服务容错等, 服务发现:可以使用zk,以及配合client定制的方式实现
服务降级:可以在zk中指定服务质量,以及根据回馈系统来drop request
总体来说net/rpc还是一个不错的框架,但是几个地方需要考虑,一个是全局大锁reqMutex,另外是call对象会大量创建(可否考虑call pool等)


