golang sql连接池
olang的”database/sql”是操作数据库时常用的包,这个包定义了一些sql操作的接口,具体的实现还需要不同数据库的实现,mysql比较优秀的一个驱动是:github.com/go-sql-driver/mysql
,在接口、驱动的设计上”database/sql”的实现非常优秀,对于类似设计有很多值得我们借鉴的地方,比如beego框架cache的实现模式就是借鉴了这个包的实现;”database/sql”除了定义接口外还有一个重要的功能:连接池,我们在实现其他网络通信时也可以借鉴其实现。
连接池的作用这里就不再多说了,我们先从一个简单的示例看下”database/sql”怎么用:
packagemainimport("fmt""database/sql"_"github.com/go-sql-driver/mysql")funcmain(){db,err:=sql.Open("mysql","username:password@tcp(host)/db_name?charset=utf8&allowOldPasswords=1")iferr!=nil{fmt.Println(err)return}deferdb.Close()rows,err:=db.Query("select*fromtest")forrows.Next(){//row.Scan(...)}rows.Close()}
用法很简单,首先Open打开一个数据库,然后调用Query、Exec执行数据库操作,github.com/go-sql-driver/mysql
具体实现了database/sql/driver
的接口,所以最终具体的数据库操作都是调用github.com/go-sql-driver/mysql
实现的方法,同一个数据库只需要调用一次Open即可,下面根据具体的操作分析下”database/sql”都干了哪些事。
import _ "github.com/go-sql-driver/mysql"
前面的”_”作用时不需要把该包都导进来,只执行包的init()
方法,mysql驱动正是通过这种方式注册到”database/sql”中的:
//github.com/go-sql-driver/mysql/driver.gofuncinit(){sql.Register("mysql",&MySQLDriver{})}typeMySQLDriverstruct{}func(dMySQLDriver)Open(dsnstring)(driver.Conn,error){...}
init()
通过Register()
方法将mysql驱动添加到sql.drivers
(类型:make(map[string]driver.Driver))中,MySQLDriver实现了driver.Driver
接口:
//database/sql/sql.gofuncRegister(namestring,driverdriver.Driver){driversMu.Lock()deferdriversMu.Unlock()ifdriver==nil{panic("sql:Registerdriverisnil")}if_,dup:=drivers[name];dup{panic("sql:Registercalledtwicefordriver"+name)}drivers[name]=driver}//database/sql/driver/driver.gotypeDriverinterface{//Openreturnsanewconnectiontothedatabase.//Thenameisastringinadriver-specificformat.////Openmayreturnacachedconnection(onepreviously//closed),butdoingsoisunnecessary;thesqlpackage//maintainsapoolofidleconnectionsforefficientre-use.////Thereturnedconnectionisonlyusedbyonegoroutineata//time.Open(namestring)(Conn,error)}
假如我们同时用到多种数据库,就可以通过调用sql.Register
将不同数据库的实现注册到sql.drivers
中去,用的时候再根据注册的name将对应的driver取出。
先看下连接池整体处理流程:
2.1 初始化DBdb,err:=sql.Open("mysql","username:password@tcp(host)/db_name?charset=utf8&allowOldPasswords=1")
sql.Open()
是取出对应的db,这时mysql还没有建立连接,只是初始化了一个sql.DB
结构,这是非常重要的一个结构,所有相关的数据都保存在此结构中;Open同时启动了一个connectionOpener
协程,后面再具体分析其作用。
typeDBstruct{driverdriver.Driver//数据库实现驱动dsnstring//数据库连接、配置参数信息,比如username、host、password等numCloseduint64musync.Mutex//锁,操作DB各成员时用到freeConn[]*driverConn//空闲连接connRequests[]chanconnRequest//阻塞请求队列,等连接数达到最大限制时,后续请求将插入此队列等待可用连接numOpenint//已建立连接或等待建立连接数openerChchanstruct{}//用于connectionOpenerclosedbooldepmap[finalCloser]depSetlastPutmap[*driverConn]string//stacktraceoflastconn'sput;debugonlymaxIdleint//最大空闲连接数maxOpenint//数据库最大连接数maxLifetimetime.Duration//连接最长存活期,超过这个时间连接将不再被复用cleanerChchanstruct{}}
maxIdle
(默认值2)、maxOpen
(默认值0,无限制)、maxLifetime(默认值0,永不过期)
可以分别通过SetMaxIdleConns
、SetMaxOpenConns
、SetConnMaxLifetime
设定。
上面说了Open
时是没有建立数据库连接的,只有等用的时候才会实际建立连接,获取可用连接的操作有两种策略:cachedOrNewConn(有可用空闲连接则优先使用,没有则创建)、alwaysNewConn(不管有没有空闲连接都重新创建),下面以一个query的例子看下具体的操作:
rows,err:=db.Query("select*fromtest")
database/sql/sql.go
:
func(db*DB)Query(querystring,args...interface{})(*Rows,error){varrows*Rowsvarerrerror//maxBadConnRetries=2fori:=0;i<maxBadConnRetries;i++{rows,err=db.query(query,args,cachedOrNewConn)iferr!=driver.ErrBadConn{break}}iferr==driver.ErrBadConn{returndb.query(query,args,alwaysNewConn)}returnrows,err}func(db*DB)query(querystring,args[]interface{},strategyconnReuseStrategy)(*Rows,error){ci,err:=db.conn(strategy)iferr!=nil{returnnil,err}//到这已经获取到了可用连接,下面进行具体的数据库操作returndb.queryConn(ci,ci.releaseConn,query,args)}
数据库连接由db.query()
获取:
func(db*DB)conn(strategyconnReuseStrategy)(*driverConn,error){db.mu.Lock()ifdb.closed{db.mu.Unlock()returnnil,errDBClosed}lifetime:=db.maxLifetime//从freeConn取一个空闲连接numFree:=len(db.freeConn)ifstrategy==cachedOrNewConn&&numFree>0{conn:=db.freeConn[0]copy(db.freeConn,db.freeConn[1:])db.freeConn=db.freeConn[:numFree-1]conn.inUse=truedb.mu.Unlock()ifconn.expired(lifetime){conn.Close()returnnil,driver.ErrBadConn}returnconn,nil}//如果没有空闲连接,而且当前建立的连接数已经达到最大限制则将请求加入connRequests队列,//并阻塞在这里,直到其它协程将占用的连接释放或connectionOpenner创建ifdb.maxOpen>0&&db.numOpen>=db.maxOpen{//MaketheconnRequestchannel.It'sbufferedsothatthe//connectionOpenerdoesn'tblockwhilewaitingforthereqtoberead.req:=make(chanconnRequest,1)db.connRequests=append(db.connRequests,req)db.mu.Unlock()ret,ok:=<-req//阻塞if!ok{returnnil,errDBClosed}ifret.err==nil&&ret.conn.expired(lifetime){//连接过期了ret.conn.Close()returnnil,driver.ErrBadConn}returnret.conn,ret.err}db.numOpen++//上面说了numOpen是已经建立或即将建立连接数,这里还没有建立连接,只是乐观的认为后面会成功,失败的时候再将此值减1db.mu.Unlock()ci,err:=db.driver.Open(db.dsn)//调用driver的Open方法建立连接iferr!=nil{//创建连接失败db.mu.Lock()db.numOpen--//correctforearlieroptimismdb.maybeOpenNewConnections()//通知connectionOpener协程尝试重新建立连接,否则在db.connRequests中等待的请求将一直阻塞,知道下次有连接建立db.mu.Unlock()returnnil,err}db.mu.Lock()dc:=&driverConn{db:db,createdAt:nowFunc(),ci:ci,}db.addDepLocked(dc,dc)dc.inUse=truedb.mu.Unlock()returndc,nil}
总结一下上面获取连接的过程:
* step1:首先检查下freeConn里是否有空闲连接,如果有且未超时则直接复用,返回连接,如果没有或连接已经过期则进入下一步;
* step2:检查当前已经建立及准备建立的连接数是否已经达到最大值,如果达到最大值也就意味着无法再创建新的连接了,当前请求需要在这等着连接释放,这时当前协程将创建一个channel:chan connRequest
,并将其插入db.connRequests
队列,然后阻塞在接收chan connRequest
上,等到有连接可用时这里将拿到释放的连接,检查可用后返回;如果还未达到最大值则进入下一步;
* step3:创建一个连接,首先将numOpen加1,然后再创建连接,如果等到创建完连接再把numOpen加1会导致多个协程同时创建连接时一部分会浪费,所以提前将numOpen占住,创建失败再将其减掉;如果创建连接成功则返回连接,失败则进入下一步
* step4:创建连接失败时有一个善后操作,当然并不仅仅是将最初占用的numOpen数减掉,更重要的一个操作是通知connectionOpener协程根据db.connRequests
等待的长度创建连接,这个操作的原因是:
numOpen在连接成功创建前就加了1,这时候如果numOpen已经达到最大值再有获取conn的请求将阻塞在step2,这些请求会等着先前进来的请求释放连接,假设先前进来的这些请求创建连接全部失败,那么如果它们直接返回了那些等待的请求将一直阻塞在那,因为不可能有连接释放(极限值,如果部分创建成功则会有部分释放),直到新请求进来重新成功创建连接,显然这样是有问题的,所以maybeOpenNewConnections
将通知connectionOpener根据db.connRequests
长度及可创建的最大连接数重新创建连接,然后将新创建的连接发给阻塞的请求。
注意:如果maxOpen=0
将不会有请求阻塞等待连接,所有请求只要从freeConn中取不到连接就会新创建。
另外Query
、Exec
有个重试机制,首先优先使用空闲连接,如果2次取到的连接都无效则尝试新创建连接。
获取到可用连接后将调用具体数据库的driver处理sql。
2.3 释放连接数据库连接在被使用完成后需要归还给连接池以供其它请求复用,释放连接的操作是:putConn()
:
func(db*DB)putConn(dc*driverConn,errerror){...//如果连接已经无效,则不再放入连接池iferr==driver.ErrBadConn{db.maybeOpenNewConnections()dc.Close()//这里最终将numOpen数减掉return}...//正常归还added:=db.putConnDBLocked(dc,nil)...}func(db*DB)putConnDBLocked(dc*driverConn,errerror)bool{ifdb.maxOpen>0&&db.numOpen>db.maxOpen{returnfalse}//有等待连接的请求则将连接发给它们,否则放入freeConnifc:=len(db.connRequests);c>0{req:=db.connRequests[0]//ThiscopyisO(n)butinpracticefasterthanalinkedlist.//TODO:considercompactingitdownlessoftenand//movingthebaseinstead?copy(db.connRequests,db.connRequests[1:])db.connRequests=db.connRequests[:c-1]iferr==nil{dc.inUse=true}req<-connRequest{conn:dc,err:err,}returntrue}elseiferr==nil&&!db.closed&&db.maxIdleConnsLocked()>len(db.freeConn){db.freeConn=append(db.freeConn,dc)db.startCleanerLocked()returntrue}returnfalse}
释放的过程:
* step1:首先检查下当前归还的连接在使用过程中是否发现已经无效,如果无效则不再放入连接池,然后检查下等待连接的请求数新建连接,类似获取连接时的异常处理,如果连接有效则进入下一步;
* step2:检查下当前是否有等待连接阻塞的请求,有的话将当前连接发给最早的那个请求,没有的话则再判断空闲连接数是否达到上限,没有则放入freeConn空闲连接池,达到上限则将连接关闭释放。
* step3:(只执行一次)启动connectionCleaner协程定时检查feeConn中是否有过期连接,有则剔除。
有个地方需要注意的是,Query
、Exec
操作用法有些差异:
a.Exec
(update、insert、delete等无结果集返回的操作)调用完后会自动释放连接;
b.Query
(返回sql.Rows)则不会释放连接,调用完后仍然占有连接,它将连接的所属权转移给了sql.Rows
,所以需要手动调用close归还连接,即使不用Rows也得调用rows.Close(),否则可能导致后续使用出错,如下的用法是错误的:
//错误db.SetMaxOpenConns(1)db.Query("select*fromtest")row,err:=db.Query("select*fromtest")//此操作将一直阻塞//正确db.SetMaxOpenConns(1)r,_:=db.Query("select*fromtest")r.Close()//将连接的所属权归还,释放连接row,err:=db.Query("select*fromtest")//otheroprow.Close()
声明:本站所有文章资源内容,如无特殊说明或标注,均为采集网络资源。如若本站内容侵犯了原著者的合法权益,可联系本站删除。