Prometheus 实战于源码分析之scrape
上一篇介绍了tagert管理和发现discovery,那个这些targets是怎样采集的呢?下面就介绍scrape,它是数据采集器。type scrapePool struct {appender storage.SampleAppenderctx context.Contextmtxsync.RWMutexconfig *config.ScrapeCon
·
上一篇介绍了tagert管理和发现discovery,那个这些targets是怎样采集的呢?下面就介绍scrape,它是数据采集器。
type scrapePool struct {
appender storage.SampleAppender
ctx context.Context
mtx sync.RWMutex
config *config.ScrapeConfig
client *http.Client
// Targets and loops must always be synchronized to have the same
// set of hashes.
targets map[uint64]*Target
loops map[uint64]loop
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
}
先定义一个scrapePool采集的池,这个里面有targets,所以需要采集的对象。还记得上一篇的TargetManager里面启动discovery里面的targetset
go func(ts *targetSet) {
ts.ts.Run(ctx)
ts.sp.stop()
tm.wg.Done()
}(ts)
具体Run方法discovery/discovery.go
func (ts *TargetSet) Run(ctx context.Context) {
Loop:
for {
// Throttle syncing to once per five seconds.
select {
case <-ctx.Done():
break Loop
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
case <-time.After(5 * time.Second):
}
select {
case <-ctx.Done():
break Loop
case <-ts.syncCh:
ts.sync()
case p := <-ts.providerCh:
ts.updateProviders(ctx, p)
}
}
}
这个里面sync方法
func (ts *TargetSet) sync() {
ts.mtx.RLock()
var all []*config.TargetGroup
for _, tg := range ts.tgroups {
all = append(all, tg)
}
ts.mtx.RUnlock()
ts.syncer.Sync(all)
}
遍历TargetGroup收集所有的target放到all变量里面
func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
start := time.Now()
var all []*Target
for _, tg := range tgs {
targets, err := targetsFromGroup(tg, sp.config)
if err != nil {
log.With("err", err).Error("creating targets failed")
continue
}
all = append(all, targets...)
}
sp.sync(all)
targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
time.Since(start).Seconds(),
)
targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}
具体看sync方法
func (sp *scrapePool) sync(targets []*Target) {
sp.mtx.Lock()
defer sp.mtx.Unlock()
var (
uniqueTargets = map[uint64]struct{}{}
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
)
for _, t := range targets {
hash := t.hash()
uniqueTargets[hash] = struct{}{}
if _, ok := sp.targets[hash]; !ok {
s := &targetScraper{Target: t, client: sp.client}
l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)
sp.targets[hash] = t
sp.loops[hash] = l
go l.run(interval, timeout, nil)
}
}
var wg sync.WaitGroup
// Stop and remove old targets and scraper loops.
for hash := range sp.targets {
if _, ok := uniqueTargets[hash]; !ok {
wg.Add(1)
go func(l loop) {
l.stop()
wg.Done()
}(sp.loops[hash])
delete(sp.loops, hash)
delete(sp.targets, hash)
}
}
wg.Wait()
}
针对每个target启动go l.run(interval, timeout, nil)采集
func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
defer close(sl.done)
select {
case <-time.After(sl.scraper.offset(interval)):
// Continue after a scraping offset.
case <-sl.ctx.Done():
return
}
var last time.Time
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-sl.ctx.Done():
return
default:
}
if !sl.appender.NeedsThrottling() {
var (
start = time.Now()
scrapeCtx, _ = context.WithTimeout(sl.ctx, timeout)
numPostRelabelSamples = 0
)
// Only record after the first scrape.
if !last.IsZero() {
targetIntervalLength.WithLabelValues(interval.String()).Observe(
time.Since(last).Seconds(),
)
}
samples, err := sl.scraper.scrape(scrapeCtx, start)
if err == nil {
numPostRelabelSamples, err = sl.append(samples)
}
if err != nil && errc != nil {
errc <- err
}
sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)
last = start
} else {
targetSkippedScrapes.Inc()
}
select {
case <-sl.ctx.Done():
return
case <-ticker.C:
}
}
}
通过sl.scraper.scrape采集,这个方法就是通过GET请求去获取
func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return nil, err
}
req.Header.Add("Accept", acceptHeader)
req.Header.Set("User-Agent", userAgentHeader)
resp, err := ctxhttp.Do(ctx, s.client, req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
}
var (
allSamples = make(model.Samples, 0, 200)
decSamples = make(model.Vector, 0, 50)
)
sdec := expfmt.SampleDecoder{
Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
Opts: &expfmt.DecodeOptions{
Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
},
}
for {
if err = sdec.Decode(&decSamples); err != nil {
break
}
allSamples = append(allSamples, decSamples...)
decSamples = decSamples[:0]
}
if err == io.EOF {
// Set err to nil since it is used in the scrape health recording.
err = nil
}
return allSamples, err
}
整个数据采集的流程就是这样。
更多推荐


所有评论(0)