问题背景

需求:给定一个 imageUrlKeyMap ,key 是 imageUrl,value 是 imageKey,实现一个功能:获取每个图片的 etag,并以 map 的形式返回,key 是 imageKey,value 是 etag

业务逻辑的伪代码如下:

// 自己封装的新建http客户端方法,包含 transport、代理的配置
client := NewHttpClient()

func GetEtags(client *http.Client, imageUrlKeyMap map[string]string) map[string]string {
	keyEtagMap := make(map[string]string)
	
	for url,key := range imageUrlKeyMap {
		go func() {
			// 给每个url发head请求获取etag
			// 并且存到 keyEtagMap
			resp,err := client.Do(..)
			if err != nil{
				...
			}
			defer resp.Body.Close()
			// ..其他操作
		}
	}
	
	return keyEtagMap
}

现象

  1. Goroutine 数量持续增长,从初始的 10+个增长到 100+个,甚至上千个
  2. 系统内存占用高速增长,构建完生产镜像后,重新拉镜像并重启,内存占用没多久就飙升到 100M,一小时后内存占用达到 1GB
    我确信每次读取完后调用的 resp.Body.Close()

定位代码

pprof 有大量的 writeLoopreadLoop 相关的 goroutine,很明显发生了 goroutine 泄漏

这部分是和 net/http 相关的代码,那一定和 http 请求有关,而我刚写的几个函数就用到了 http , 注释掉这部分代码之后内存就降下去了

An image to describe post

可复现的代码

main. go

package main  
  
import (  
    "context"  
    "crypto/tls"    
    "demo/httpx"    
    "log"    
    "net/http"    
    "runtime"    
    "time")  
  
// 打印详细内存统计信息  
func printDetailedMemStats(iteration int) {  
    var m runtime.MemStats  
    runtime.ReadMemStats(&m)  
  
    log.Printf("=== 第 %d 次循环 ===", iteration)  
    log.Printf("当前goroutine数量: %d", runtime.NumGoroutine())  
    log.Printf("内存分配: %d MB", m.Alloc/1024/1024)  
    log.Printf("系统内存: %d MB", m.Sys/1024/1024)  
    log.Printf("堆内存: %d MB", m.HeapAlloc/1024/1024)  
    log.Printf("堆系统内存: %d MB", m.HeapSys/1024/1024)  
    log.Printf("堆空闲内存: %d MB", m.HeapIdle/1024/1024)  
    log.Printf("堆使用内存: %d MB", m.HeapInuse/1024/1024)  
    log.Printf("堆释放内存: %d MB", m.HeapReleased/1024/1024)  
    log.Printf("垃圾回收次数: %d", m.NumGC)  
    log.Printf("垃圾回收时间: %d ms", m.PauseTotalNs/1000000)  
    log.Printf("下次GC目标: %d MB", m.NextGC/1024/1024)  
    log.Printf("==================")  
}  
  
func newClient() *http.Client {  
    client := &http.Client{Timeout: 30 * time.Second}  
    tr := &http.Transport{  
       ForceAttemptHTTP2: false,  
    }  
    tr.TLSClientConfig = &tls.Config{  
       InsecureSkipVerify: true,  
    }  
    client.Transport = tr  
  
    return client  
}  
  
func newMap() map[string]string {  
    urlKeyMap := make(map[string]string)  
  
    urlKeyMap["https://img.mytheresa.com/1200/1200/66/jpeg/catalog/product/a5/P00143513_d4.jpg"] = "1"  
    urlKeyMap["https://img.mytheresa.com/1200/1200/66/jpeg/catalog/product/a5/P00143513_d3.jpg"] = "2"  
    urlKeyMap["https://img.mytheresa.com/1200/1200/66/jpeg/catalog/product/ad/P00167092_d3.jpg"] = "3"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/ad/P00167092_b1.jpg"] = "4"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d2.jpg"] = "5"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d1.jpg"] = "6"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078.jpg"] = "7"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "8"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "9"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "10"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "11"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "12"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "13"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "14"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "15"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "16"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "17"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "18"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "19"  
    urlKeyMap["https://img.mytheresa.com/512/512/66/jpeg/catalog/product/56/P00168078_d3.jpg"] = "20"  
  
    return urlKeyMap  
}  
func main() {  
    urlKeyMap := newMap()  
  
    for i := 0; i < 1000; i++ {  
       client := newClient()  
       _, err := httpx.FetchEtagBatchByImageMap(context.Background(), client, urlKeyMap)  
       if err != nil {  
          log.Printf("err: %v", err)  
       } else {  
       }  
       if i%10 == 0 {  
       	  // 每10次打印内存信息
          printDetailedMemStats(i)  
       }  
    }  
}

问题分析

看了源码才知道, http.Client 的整个 http 请求的读取都是通过 Transport 来实现的, Transport 利用来 persistConn 封装了 Connection 来达到复用连接的目的

源码路径: net/http/client.go:211net/http/client.go:259

我们平常使用都是 client.Do(req) ,源码实际上调用的是 send(req *http.Request, rt RoundTripper, deadline time.Time)

http.Transport 这个结构体实现了 RoundTripper 这个接口,对应的代码在 net/http/transport.go:582

经过分析,transport 自己维护了一个 idle connection queue ,每次获取连接的时候,会从空闲连接队列中取出一个 persistConn 对象( net/http/transport.go:1515 ):

w := &wantConn{  
    cm:         cm,  
    key:        cm.key(),  
    ctx:        dialCtx,  
    cancelCtx:  dialCancel,  
    result:     make(chan connOrError, 1),  
    beforeDial: testHookPrePendingDial,  
    afterDial:  testHookPostPendingDial,  
}  
defer func() {  
    if err != nil {  
       w.cancel(t, err)  
    }  
}()  
  
// Queue for idle connection.  
if delivered := t.queueForIdleConn(w); !delivered {  
    t.queueForDial(w)  
}

persistConn 封装了普通的 Connection 来达到连接复用的目的,一旦 http.client 设置了 keep-alive ,这个 connection 就不会断开,重复利用:

An image to describe post

之前排查的过程中已经发现大量 goroutine 伴随着 readLoopwriteLoop ,接下来分析一下 readLoop 的代码( net/http/transport.go:2239

这个函数的逻辑就是一个 for 死循环里好几个 select 调用

它是如何做连接复用管理的?

An image to describe post

An image to describe post

代码写的很清楚了,无响应体/数据读完的时候,会将 conn 放回到 空闲连接池

什么时候退出 for 死循环?

当服务器/客户端要求关闭连接,或者复用失败的时候将 alive 设置为 false,然后退出循环

对业务代码的影响?

现在的代码逻辑是:每调用 1 次接口,就会创建 http.Client ,每个 http.Client 都会持有连接池,每个连接池都有一个 goroutine

到最后 990 次到时候,goroutine 和内存占用都下降了,我觉得是连接池中的空闲连接已经被完全回收了

如果使用默认的 Transparent ,它的最大空闲连接是没有上限的,并且每个空闲连接的 Timeout 也是没有上限的

该接口调用频率非常高,导致短时间内创建了大量的 client。接口调用结束的时候,client的连接池和 goroutine 不会立即释放,需要等待 IdleConnTimeout (默认是无上限的),或者等待 GC 回收 Transport 对象,然后就会出现内存持续增长的情况

解决方案

client 复用

用一个全局的 client 来处理每次接口调用,但业务代码存在局限性(需要根据 label 创建不同的 transport)

  
func main() {  
    urlKeyMap := newMap()  
  	
  	// client 放到了外面
    client := newClient()  
    for i := 1; i <= 1000; i++ {  
       _, err := httpx.FetchEtagBatchByImageMap(context.Background(), client, urlKeyMap)  
       if err != nil {  
          log.Printf("err: %v", err)  
       } else {  
       }  
       if i%10 == 0 {  
          printDetailedMemStats(i)  
       }  
    }  
}

An image to describe post

资源释放

  
func main() {  
    urlKeyMap := newMap()  
  
    for i := 1; i <= 1000; i++ {  
       client := newClient()  
       _, err := httpx.FetchEtagBatchByImageMap(context.Background(), client, urlKeyMap)  
       if err != nil {  
          log.Printf("err: %v", err)  
       } else {  
       }  
       // 业务完成后,立即清理这个client的空闲连接  
       if tr, ok := client.Transport.(*http.Transport); ok {  
          tr.CloseIdleConnections()  
       }  
  
       if i%10 == 0 {  
          printDetailedMemStats(i)  
       }  
    }  
}

效果:

An image to describe post

限制 IdleConnTimeout

    
func newClient() *http.Client {  
    client := &http.Client{Timeout: 30 * time.Second}  
    tr := &http.Transport{  
       ForceAttemptHTTP2: false,  
       // 最大空闲连接时长
       IdleConnTimeout:   10 * time.Second,  
       DisableKeepAlives: false, // 启用keep-alive  
    }  
    tr.TLSClientConfig = &tls.Config{  
       InsecureSkipVerify: true,  
    }  
    client.Transport = tr  
  
    return client  
}


func main() {  
    urlKeyMap := newMap()  
  
    for i := 1; i <= 1000; i++ {  
       client := newClient()  
       _, err := httpx.FetchEtagBatchByImageMap(context.Background(), client, urlKeyMap)  
       if err != nil {  
          log.Printf("err: %v", err)  
       } else {  
       }  
       //// 立即清理这个client的连接  
       //if tr, ok := client.Transport.(*http.Transport); ok {  
       // tr.CloseIdleConnections()       //}  
       if i%10 == 0 {  
          printDetailedMemStats(i)  
       }  
    }  
}

效果:

An image to describe post