VictoriaMetrics的数据分片与目标分组原理

1、数据分片:vminsert的哈希选择存储节点

如果vminsert后面存在多个vmstorage,vminsert就会对数据进行分片(或者说打散),vminsert主要通过一致性哈希选择vmstorage节点:

func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
    ctx := netstorage.GetInsertCtx()
    defer netstorage.PutInsertCtx(ctx)

    ctx.Reset()
    rowsTotal := 0
    perTenantRows := make(map[auth.Token]int)
    hasRelabeling := relabel.HasRelabeling()
    for i := range series {
        ss := &series[i]
        rowsTotal += len(ss.Points)
        ctx.Labels = ctx.Labels[:0]
        ctx.AddLabel("", ss.Metric)
        if ss.Host != "" {
            ctx.AddLabel("host", ss.Host)
        }
        if ss.Device != "" {
            ctx.AddLabel("device", ss.Device)
        }
        for _, tag := range ss.Tags {
            name, value := parser.SplitTag(tag)
            if name == "host" {
                name = "exported_host"
            }
            ctx.AddLabel(name, value)
        }
        for j := range extraLabels {
            label := &extraLabels[j]
            ctx.AddLabel(label.Name, label.Value)
        }
        if hasRelabeling {
            ctx.ApplyRelabeling()
        }
        if len(ctx.Labels) == 0 {
            // Skip metric without labels.
            continue
        }
        ctx.SortLabelsIfNeeded()
        atLocal := ctx.GetLocalAuthToken(at)
        ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
        // 获取本series hash到的nodeid,然后将本series所有数据点都发送到此node,采用插入到node对象的buf里面去
        storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
        for _, pt := range ss.Points {
            timestamp := pt.Timestamp()
            value := pt.Value()
            if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
                return err
            }
        }
        perTenantRows[*atLocal] += len(ss.Points)
    }
    rowsInserted.Add(rowsTotal)
    rowsTenantInserted.MultiAdd(perTenantRows)
    rowsPerInsert.Update(float64(rowsTotal))
    return ctx.FlushBufs()
}

// 每个node对应一个node对象,会一直将node buf里的metrics数据发送到对应的远程storage对象
func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
    replicas := *replicationFactor
    if replicas <= 0 {
        replicas = 1
    }
    sns := snb.sns
    if replicas > len(sns) {
        replicas = len(sns)
    }

    sn.readOnlyCheckerWG.Add(1)
    go func() {
        defer sn.readOnlyCheckerWG.Done()
        sn.readOnlyChecker()
    }()
    defer sn.readOnlyCheckerWG.Wait()

    ticker := time.NewTicker(200 * time.Millisecond)
    defer ticker.Stop()
    var br bufRows
    brLastResetTime := fasttime.UnixTimestamp()
    var waitCh <-chan struct{}
    mustStop := false
    for !mustStop {
        sn.brLock.Lock()
        bufLen := len(sn.br.buf)
        sn.brLock.Unlock()
        waitCh = nil
        if bufLen > 0 {
            // Do not sleep if sn.br.buf isn't empty.
            waitCh = closedCh
        }
        select {
        case <-sn.stopCh:
            mustStop = true
            // Make sure the sn.buf is flushed last time before returning
            // in order to send the remaining bits of data.
        case <-ticker.C:
        case <-waitCh:
        }
        sn.brLock.Lock()
        sn.br, br = br, sn.br
        sn.brCond.Broadcast()
        sn.brLock.Unlock()
        currentTime := fasttime.UnixTimestamp()
        if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
            // Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
            br.buf = append(br.buf[:0:0], br.buf...)
            brLastResetTime = currentTime
        }
        sn.checkHealth()
        if len(br.buf) == 0 {
            // Nothing to send.
            continue
        }
        // Send br to replicas storage nodes starting from snIdx.
        for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
            t := timerpool.Get(200 * time.Millisecond)
            select {
            case <-sn.stopCh:
                timerpool.Put(t)
                return
            case <-t.C:
                timerpool.Put(t)
                sn.checkHealth()
            }
        }
        br.reset()
    }
}

// 同时,如果有多副本,那么vminsert将数据hash到多个不同的vmstorage节点
func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
    usedStorageNodes := make(map[*storageNode]struct{}, replicas)
    sns := snb.sns
    for i := 0; i < replicas; i++ {
        idx := snIdx + i
        attempts := 0
        for {
            attempts++
            if attempts > len(sns) {
                if i == 0 {
                    // The data wasn't replicated at all.
                    cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
                        "re-trying to send the data soon", len(br.buf), br.rows)
                    return false
                }
                // The data is partially replicated, so just emit a warning and return true.
                // We could retry sending the data again, but this may result in uncontrolled duplicate data.
                // So it is better returning true.
                rowsIncompletelyReplicatedTotal.Add(br.rows)
                incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
                    "since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
                return true
            }
            if idx >= len(sns) {
                idx %= len(sns)
            }
            sn := sns[idx]
            idx++
            if _, ok := usedStorageNodes[sn]; ok {
                // The br has been already replicated to sn. Skip it.
                continue
            }
            if !sn.sendBufRowsNonblocking(br) {
                // Cannot send data to sn. Go to the next sn.
                continue
            }
            // Successfully sent data to sn.
            usedStorageNodes[sn] = struct{}{}
            break
        }
    }
    return true
}

vminsert默认情况下会将同一series的sample写入到同一个storage,多副本的时候则hash到多个storage

2、目标分组:vmagent的哈希选择对象

vmagent分组方式,通过vmagent传入的4个参数标识集、组成员总数、当前成员编号、副本数 ,进行判断,是否为当前vmagent要抓取的target:

// 获取需要抓取的对象
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels *promutils.Labels) (*ScrapeWork, error) {
    labels := promutils.GetLabels()
    defer promutils.PutLabels(labels)

    mergeLabels(labels, swc, target, extraLabels, metaLabels)
    var originalLabels *promutils.Labels
    if !*dropOriginalLabels {
        originalLabels = labels.Clone()
    }
    labels.Labels = swc.relabelConfigs.Apply(labels.Labels, 0)
    // Remove labels starting from "__meta_" prefix according to https://www.robustperception.io/life-of-a-label/
    labels.RemoveMetaLabels()

    // Verify whether the scrape work must be skipped because of `-promscrape.cluster.*` configs.
    // Perform the verification on labels after the relabeling in order to guarantee that targets with the same set of labels
    // go to the same vmagent shard.
    // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
    // 当集群成员数量大于1时,此时需要每个成员按照自己的id去选择需要采集的对象
    if *clusterMembersCount > 1 {
        bb := scrapeWorkKeyBufPool.Get()
        bb.B = appendScrapeWorkKey(bb.B[:0], labels)
        // 这里needSkipScrapeWork用于判断是否属于自己需要采集的
        needSkip := needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor, clusterMemberID)
        scrapeWorkKeyBufPool.Put(bb)
        if needSkip {
            return nil, nil
        }
    }
    if !*dropOriginalLabels {
        originalLabels.Sort()
        // Reduce memory usage by interning all the strings in originalLabels.
        originalLabels.InternStrings()
    }
    if labels.Len() == 0 {
        // Drop target without labels.
        droppedTargetsMap.Register(originalLabels, swc.relabelConfigs)
        return nil, nil
    }
    scrapeURL, address := promrelabel.GetScrapeURL(labels, swc.params)
    if scrapeURL == "" {
        // Drop target without URL.
        droppedTargetsMap.Register(originalLabels, swc.relabelConfigs)
        return nil, nil
    }
    if _, err := url.Parse(scrapeURL); err != nil {
        return nil, fmt.Errorf("invalid target url=%q for job=%q: %w", scrapeURL, swc.jobName, err)
    }

    var at *auth.Token
    tenantID := labels.Get("__tenant_id__")
    if len(tenantID) > 0 {
        newToken, err := auth.NewToken(tenantID)
        if err != nil {
            return nil, fmt.Errorf("cannot parse __tenant_id__=%q for job=%q: %w", tenantID, swc.jobName, err)
        }
        at = newToken
    }

    // Read __scrape_interval__ and __scrape_timeout__ from labels.
    scrapeInterval := swc.scrapeInterval
    if s := labels.Get("__scrape_interval__"); len(s) > 0 {
        d, err := promutils.ParseDuration(s)
        if err != nil {
            return nil, fmt.Errorf("cannot parse __scrape_interval__=%q: %w", s, err)
        }
        scrapeInterval = d
    }
    scrapeTimeout := swc.scrapeTimeout
    if s := labels.Get("__scrape_timeout__"); len(s) > 0 {
        d, err := promutils.ParseDuration(s)
        if err != nil {
            return nil, fmt.Errorf("cannot parse __scrape_timeout__=%q: %w", s, err)
        }
        scrapeTimeout = d
    }
    // Read series_limit option from __series_limit__ label.
    // See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter
    seriesLimit := swc.seriesLimit
    if s := labels.Get("__series_limit__"); len(s) > 0 {
        n, err := strconv.Atoi(s)
        if err != nil {
            return nil, fmt.Errorf("cannot parse __series_limit__=%q: %w", s, err)
        }
        seriesLimit = n
    }
    // Read stream_parse option from __stream_parse__ label.
    // See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
    streamParse := swc.streamParse
    if s := labels.Get("__stream_parse__"); len(s) > 0 {
        b, err := strconv.ParseBool(s)
        if err != nil {
            return nil, fmt.Errorf("cannot parse __stream_parse__=%q: %w", s, err)
        }
        streamParse = b
    }
    // Remove labels with "__" prefix according to https://www.robustperception.io/life-of-a-label/
    labels.RemoveLabelsWithDoubleUnderscorePrefix()
    // Add missing "instance" label according to https://www.robustperception.io/life-of-a-label
    if labels.Get("instance") == "" {
        labels.Add("instance", address)
    }
    // Remove references to deleted labels, so GC could clean strings for label name and label value past len(labels.Labels).
    // This should reduce memory usage when relabeling creates big number of temporary labels with long names and/or values.
    // See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825 for details.
    labelsCopy := labels.Clone()
    // Sort labels in alphabetical order of their names.
    labelsCopy.Sort()
    // Reduce memory usage by interning all the strings in labels.
    labelsCopy.InternStrings()

    sw := &ScrapeWork{
        ScrapeURL:            scrapeURL,
        ScrapeInterval:       scrapeInterval,
        ScrapeTimeout:        scrapeTimeout,
        HonorLabels:          swc.honorLabels,
        HonorTimestamps:      swc.honorTimestamps,
        DenyRedirects:        swc.denyRedirects,
        OriginalLabels:       originalLabels,
        Labels:               labelsCopy,
        ExternalLabels:       swc.externalLabels,
        ProxyURL:             swc.proxyURL,
        ProxyAuthConfig:      swc.proxyAuthConfig,
        AuthConfig:           swc.authConfig,
        RelabelConfigs:       swc.relabelConfigs,
        MetricRelabelConfigs: swc.metricRelabelConfigs,
        SampleLimit:          swc.sampleLimit,
        DisableCompression:   swc.disableCompression,
        DisableKeepAlive:     swc.disableKeepAlive,
        StreamParse:          streamParse,
        ScrapeAlignInterval:  swc.scrapeAlignInterval,
        ScrapeOffset:         swc.scrapeOffset,
        SeriesLimit:          seriesLimit,
        NoStaleMarkers:       swc.noStaleMarkers,
        AuthToken:            at,

        jobNameOriginal: swc.jobName,
    }
    return sw, nil
}

// 判断是否是属于自己采集的,根据id来决定是否是自己需要采集的,会对key进行hash,此时不管目标列表顺序是否一致都没关系,
// 相同对象的key值一样,hash出来的也都一样,然后就是还有根据副本数量,然后同样的用id去判断自己需要采集多少副本
func needSkipScrapeWork(key string, membersCount, replicasCount, memberNum int) bool {
    if membersCount <= 1 {
        return false
    }
    h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
    idx := int(h % uint64(membersCount))
    if replicasCount < 1 {
        replicasCount = 1
    }
    for i := 0; i < replicasCount; i++ {
        if idx == memberNum {
            return false
        }
        idx++
        if idx >= membersCount {
            idx = 0
        }
    }
    return true
}

注意:如果多个vmagent如果从sd那边拿到的对象顺序不一致也没关系,采用对key做hash,再跟成员数量求余的方式来判断的

注意点:

  • vminsert默认情况下会将同一series的sample写入到同一个storage,多副本的时候则hash到多个storage
  • 如果多个vmagent如果从sd那边拿到的对象顺序不一致也没关系,采用对key做hash,再跟成员数量求余的方式来判断的
Logo

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。

更多推荐