今天就跟大家聊聊有关Prometheus时序数据库中怎么查询数据,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

Promql

一个Promql表达式可以计算为下面四种类型:

瞬时向量(InstantVector)-一组同样时间戳的时间序列(取自不同的时间序列,例如不同机器同一时间的CPUidle)区间向量(Rangevector)-一组在一段时间范围内的时间序列标量(Scalar)-一个浮点型的数据值字符串(String)-一个简单的字符串

我们还可以在Promql中使用svm/avg等集合表达式,不过只能用在瞬时向量(Instant Vector)上面。为了阐述Prometheus的聚合计算以及篇幅原因,笔者在本篇文章只详细分析瞬时向量(Instant Vector)的执行过程。

瞬时向量(Instant Vector)

前面说到,瞬时向量是一组拥有同样时间戳的时间序列。但是实际过程中,我们对不同Endpoint采样的时间是不可能精确一致的。所以,Prometheus采取了距离指定时间戳之前最近的数据(Sample)。如下图所示:

当然,如果是距离当前时间戳1个小时的数据直观看来肯定不能纳入到我们的返回结果里面。

所以Prometheus通过一个指定的时间窗口来过滤数据(通过启动参数—query.lookback-delta指定,默认5min)。

对一条简单的Promql进行分析

好了,解释完Instant Vector概念之后,我们可以着手进行分析了。直接上一条带有聚合函数的Promql吧。

SUMBY(group)(http_requests{job="api-server",group="production"})

首先,对于这种有语法结构的语句肯定是将其Parse一把,构造成AST树了。调用

promql.ParseExpr

由于Promql较为简单,所以Prometheus直接采用了LL语法分析。在这里直接给出上述Promql的AST树结构。

Prometheus对于语法树的遍历过程都是通过vistor模式,具体到代码为:

ast.govistor设计模式funcWalk(vVisitor,nodeNode,path[]Node)error{varerrerrorifv,err=v.Visit(node,path);v==nil||err!=nil{returnerr}path=append(path,node)for_,e:=rangeChildren(node){iferr:=Walk(v,e,path);err!=nil{returnerr}}_,err=v.Visit(nil,nil)returnerr}func(finspector)Visit(nodeNode,path[]Node)(Visitor,error){iferr:=f(node,path);err!=nil{returnnil,err}returnf,nil}

通过golang里非常方便的函数式功能,直接传递求值函数inspector进行不同情况下的求值。

typeinspectorfunc(Node,[]Node)error

求值过程

具体的求值过程核心函数为:

func(ng*Engine)execEvalStmt(ctxcontext.Context,query*query,s*EvalStmt)(Value,storage.Warnings,error){......querier,warnings,err:=ng.populateSeries(ctxPrepare,query.queryable,s)//这边拿到对应序列的数据......val,err:=evaluator.Eval(s.Expr)//here聚合计算......}

populateSeries

首先通过populateSeries的计算出VectorSelector Node所对应的series(时间序列)。这里直接给出求值函数

func(nodeNode,path[]Node)error{......querier,err:=q.Querier(ctx,timestamp.FromTime(mint),timestamp.FromTime(s.End))......case*VectorSelector:.......set,wrn,err=querier.Select(params,n.LabelMatchers...)......n.unexpandedSeriesSet=set......case*MatrixSelector:......}returnnil

可以看到这个求值函数,只对VectorSelector/MatrixSelector进行操作,针对我们的Promql也就是只对叶子节点VectorSelector有效。

select

获取对应数据的核心函数就在querier.Select。我们先来看下qurier是如何得到的.

querier,err:=q.Querier(ctx,timestamp.FromTime(mint),timestamp.FromTime(s.End))

根据时间戳范围去生成querier,里面最重要的就是计算出哪些block在这个时间范围内,并将他们附着到querier里面。具体见函数

func(db*DB)Querier(mint,maxtint64)(Querier,error){for_,b:=rangedb.blocks{......//遍历blocks挑选block}//如果maxt>head.mint(即内存中的block),那么也加入到里面querier里面。ifmaxt>=db.head.MinTime(){blocks=append(blocks,&rangeHead{head:db.head,mint:mint,maxt:maxt,})}......}

知道数据在哪些block里面,我们就可以着手进行计算VectorSelector的数据了。

//labelMatchers{job:api-server}{__name__:http_requests}{group:production}querier.Select(params,n.LabelMatchers...)

有了matchers我们很容易的就能够通过倒排索引取到对应的series。为了篇幅起见,我们假设数据都在headBlock(也就是内存里面)。那么我们对于倒排的计算就如下图所示:

这样,我们的VectorSelector节点就已经有了最终的数据存储地址信息了,例如图中的memSeries refId=3和4。


如果想了解在磁盘中的数据寻址,可以详见笔者之前的博客

<<Prometheus时序数据库-磁盘中的存储结构>>

通过populateSeries找到对应的数据,那么我们就可以通过evaluator.Eval获取最终的结果了。计算采用后序遍历,等下层节点返回数据后才开始上层节点的计算。那么很自然的,我们先计算VectorSelector。

func(ev*evaluator)eval(exprExpr)Value{......case*VectorSelector://通过refId拿到对应的SeriescheckForSeriesSetExpansion(ev.ctx,e)//遍历所有的seriesfori,s:=rangee.series{//由于我们这边考虑的是instantquery,所以只循环一次forts:=ev.startTimestamp;ts<=ev.endTimestamp;ts+=ev.interval{//获取距离ts最近且小于ts的最近的sample_,v,ok:=ev.vectorSelectorSingle(it,e,ts)ifok{ifev.currentSamples<ev.maxSamples{//注意,这边的v对应的原始t被替换成了ts,也就是instantquerytimeStampss.Points=append(ss.Points,Point{V:v,T:ts})ev.currentSamples++}else{ev.error(ErrTooManySamples(env))}}......}}}

如代码注释中看到,当我们找到一个距离ts最近切小于ts的sample时候,只用这个sample的value,其时间戳则用ts(Instant Query指定的时间戳)代替。

其中vectorSelectorSingle值得我们观察一下:

func(ev*evaluator)vectorSelectorSingle(it*storage.BufferedSeriesIterator,node*VectorSelector,tsint64)(int64,float64,bool){......//这一步是获取>=refTime的数据,也就是我们instantquery传入的ok:=it.Seek(refTime)......if!ok||t>refTime{//由于我们需要的是<=refTime的数据,所以这边回退一格,由于同一memSeries同一时间的数据只有一条,所以回退的数据肯定是<=refTime的t,v,ok=it.PeekBack(1)if!ok||t<refTime-durationMilliseconds(LookbackDelta){return0,0,false}}}

就这样,我们找到了series 3和4距离Instant Query时间最近且小于这个时间的两条记录,并保留了记录的标签。这样,我们就可以在上层进行聚合。

SUM by聚合

叶子节点VectorSelector得到了对应的数据后,我们就可以对上层节点AggregateExpr进行聚合计算了。代码栈为:

evaluator.rangeEval|->evaluate.eval.func2|->evelator.aggregationgroupingkey为group

具体的函数如下图所示:

func(ev*evaluator)aggregation(opItemType,grouping[]string,withoutbool,paraminterface{},vecVector,enh*EvalNodeHelper)Vector{......//对所有的samplefor_,s:=rangevec{metric:=s.Metric......group,ok:=result[groupingKey]//如果此group不存在,则新加一个groupif!ok{......result[groupingKey]=&groupedAggregation{labels:m,//在这里我们的m=[group:production]value:s.V,mean:s.V,groupCount:1,}......}switchop{//这边就是对SUM的最终处理caseSUM:group.value+=s.V.....}}.....for_,aggr:=rangeresult{enh.out=append(enh.out,Sample{Metric:aggr.labels,Point:Point{V:aggr.value},})}......returnenh.out}

好了,有了上面的处理,我们聚合的结果就变为:

看完上述内容,你们对Prometheus时序数据库中怎么查询数据有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。