上一篇介绍了tagert管理和发现discovery,那个这些targets是怎样采集的呢?下面就介绍scrape,它是数据采集器。

type scrapePool struct {
    appender storage.SampleAppender

    ctx context.Context

    mtx    sync.RWMutex
    config *config.ScrapeConfig
    client *http.Client
    // Targets and loops must always be synchronized to have the same
    // set of hashes.
    targets map[uint64]*Target
    loops   map[uint64]loop

    // Constructor for new scrape loops. This is settable for testing convenience.
    newLoop func(context.Context, scraper, storage.SampleAppender, model.LabelSet, *config.ScrapeConfig) loop
}

先定义一个scrapePool采集的池,这个里面有targets,所以需要采集的对象。还记得上一篇的TargetManager里面启动discovery里面的targetset

go func(ts *targetSet) {
    ts.ts.Run(ctx)
    ts.sp.stop()
    tm.wg.Done()
}(ts)

具体Run方法discovery/discovery.go

func (ts *TargetSet) Run(ctx context.Context) {
Loop:
    for {
        // Throttle syncing to once per five seconds.
        select {
        case <-ctx.Done():
            break Loop
        case p := <-ts.providerCh:
            ts.updateProviders(ctx, p)
        case <-time.After(5 * time.Second):
        }

        select {
        case <-ctx.Done():
            break Loop
        case <-ts.syncCh:
            ts.sync()
        case p := <-ts.providerCh:
            ts.updateProviders(ctx, p)
        }
    }
}

这个里面sync方法

func (ts *TargetSet) sync() {
    ts.mtx.RLock()
    var all []*config.TargetGroup
    for _, tg := range ts.tgroups {
        all = append(all, tg)
    }
    ts.mtx.RUnlock()

    ts.syncer.Sync(all)
}

遍历TargetGroup收集所有的target放到all变量里面

func (sp *scrapePool) Sync(tgs []*config.TargetGroup) {
    start := time.Now()

    var all []*Target
    for _, tg := range tgs {
        targets, err := targetsFromGroup(tg, sp.config)
        if err != nil {
            log.With("err", err).Error("creating targets failed")
            continue
        }
        all = append(all, targets...)
    }
    sp.sync(all)

    targetSyncIntervalLength.WithLabelValues(sp.config.JobName).Observe(
        time.Since(start).Seconds(),
    )
    targetScrapePoolSyncsCounter.WithLabelValues(sp.config.JobName).Inc()
}

具体看sync方法

func (sp *scrapePool) sync(targets []*Target) {
    sp.mtx.Lock()
    defer sp.mtx.Unlock()

    var (
        uniqueTargets = map[uint64]struct{}{}
        interval      = time.Duration(sp.config.ScrapeInterval)
        timeout       = time.Duration(sp.config.ScrapeTimeout)
    )

    for _, t := range targets {
        hash := t.hash()
        uniqueTargets[hash] = struct{}{}

        if _, ok := sp.targets[hash]; !ok {
            s := &targetScraper{Target: t, client: sp.client}
            l := sp.newLoop(sp.ctx, s, sp.appender, t.Labels(), sp.config)

            sp.targets[hash] = t
            sp.loops[hash] = l

            go l.run(interval, timeout, nil)
        }
    }

    var wg sync.WaitGroup

    // Stop and remove old targets and scraper loops.
    for hash := range sp.targets {
        if _, ok := uniqueTargets[hash]; !ok {
            wg.Add(1)
            go func(l loop) {
                l.stop()
                wg.Done()
            }(sp.loops[hash])

            delete(sp.loops, hash)
            delete(sp.targets, hash)
        }
    }
    wg.Wait()
}

针对每个target启动go l.run(interval, timeout, nil)采集

func (sl *scrapeLoop) run(interval, timeout time.Duration, errc chan<- error) {
    defer close(sl.done)

    select {
    case <-time.After(sl.scraper.offset(interval)):
        // Continue after a scraping offset.
    case <-sl.ctx.Done():
        return
    }

    var last time.Time

    ticker := time.NewTicker(interval)
    defer ticker.Stop()

    for {
        select {
        case <-sl.ctx.Done():
            return
        default:
        }

        if !sl.appender.NeedsThrottling() {
            var (
                start                 = time.Now()
                scrapeCtx, _          = context.WithTimeout(sl.ctx, timeout)
                numPostRelabelSamples = 0
            )

            // Only record after the first scrape.
            if !last.IsZero() {
                targetIntervalLength.WithLabelValues(interval.String()).Observe(
                    time.Since(last).Seconds(),
                )
            }

            samples, err := sl.scraper.scrape(scrapeCtx, start)
            if err == nil {
                numPostRelabelSamples, err = sl.append(samples)
            }
            if err != nil && errc != nil {
                errc <- err
            }
            sl.report(start, time.Since(start), len(samples), numPostRelabelSamples, err)
            last = start
        } else {
            targetSkippedScrapes.Inc()
        }

        select {
        case <-sl.ctx.Done():
            return
        case <-ticker.C:
        }
    }
}

通过sl.scraper.scrape采集,这个方法就是通过GET请求去获取

func (s *targetScraper) scrape(ctx context.Context, ts time.Time) (model.Samples, error) {
    req, err := http.NewRequest("GET", s.URL().String(), nil)
    if err != nil {
        return nil, err
    }
    req.Header.Add("Accept", acceptHeader)
    req.Header.Set("User-Agent", userAgentHeader)

    resp, err := ctxhttp.Do(ctx, s.client, req)
    if err != nil {
        return nil, err
    }
    defer resp.Body.Close()

    if resp.StatusCode != http.StatusOK {
        return nil, fmt.Errorf("server returned HTTP status %s", resp.Status)
    }

    var (
        allSamples = make(model.Samples, 0, 200)
        decSamples = make(model.Vector, 0, 50)
    )
    sdec := expfmt.SampleDecoder{
        Dec: expfmt.NewDecoder(resp.Body, expfmt.ResponseFormat(resp.Header)),
        Opts: &expfmt.DecodeOptions{
            Timestamp: model.TimeFromUnixNano(ts.UnixNano()),
        },
    }

    for {
        if err = sdec.Decode(&decSamples); err != nil {
            break
        }
        allSamples = append(allSamples, decSamples...)
        decSamples = decSamples[:0]
    }

    if err == io.EOF {
        // Set err to nil since it is used in the scrape health recording.
        err = nil
    }
    return allSamples, err
}

整个数据采集的流程就是这样。

Logo

讨论HarmonyOS开发技术,专注于API与组件、DevEco Studio、测试、元服务和应用上架分发等。

更多推荐