【博客670】VictoriaMetrics的数据分片与目标分组原理
vminsert默认情况下会将同一series的sample写入到同一个storage,多副本的时候则hash到多个storage。
·
VictoriaMetrics的数据分片与目标分组原理
1、数据分片:vminsert的哈希选择存储节点
如果vminsert后面存在多个vmstorage,vminsert就会对数据进行分片(或者说打散),vminsert主要通过一致性哈希选择vmstorage节点:
func insertRows(at *auth.Token, series []parser.Series, extraLabels []prompbmarshal.Label) error {
ctx := netstorage.GetInsertCtx()
defer netstorage.PutInsertCtx(ctx)
ctx.Reset()
rowsTotal := 0
perTenantRows := make(map[auth.Token]int)
hasRelabeling := relabel.HasRelabeling()
for i := range series {
ss := &series[i]
rowsTotal += len(ss.Points)
ctx.Labels = ctx.Labels[:0]
ctx.AddLabel("", ss.Metric)
if ss.Host != "" {
ctx.AddLabel("host", ss.Host)
}
if ss.Device != "" {
ctx.AddLabel("device", ss.Device)
}
for _, tag := range ss.Tags {
name, value := parser.SplitTag(tag)
if name == "host" {
name = "exported_host"
}
ctx.AddLabel(name, value)
}
for j := range extraLabels {
label := &extraLabels[j]
ctx.AddLabel(label.Name, label.Value)
}
if hasRelabeling {
ctx.ApplyRelabeling()
}
if len(ctx.Labels) == 0 {
// Skip metric without labels.
continue
}
ctx.SortLabelsIfNeeded()
atLocal := ctx.GetLocalAuthToken(at)
ctx.MetricNameBuf = storage.MarshalMetricNameRaw(ctx.MetricNameBuf[:0], atLocal.AccountID, atLocal.ProjectID, ctx.Labels)
// 获取本series hash到的nodeid,然后将本series所有数据点都发送到此node,采用插入到node对象的buf里面去
storageNodeIdx := ctx.GetStorageNodeIdx(atLocal, ctx.Labels)
for _, pt := range ss.Points {
timestamp := pt.Timestamp()
value := pt.Value()
if err := ctx.WriteDataPointExt(storageNodeIdx, ctx.MetricNameBuf, timestamp, value); err != nil {
return err
}
}
perTenantRows[*atLocal] += len(ss.Points)
}
rowsInserted.Add(rowsTotal)
rowsTenantInserted.MultiAdd(perTenantRows)
rowsPerInsert.Update(float64(rowsTotal))
return ctx.FlushBufs()
}
// 每个node对应一个node对象,会一直将node buf里的metrics数据发送到对应的远程storage对象
func (sn *storageNode) run(snb *storageNodesBucket, snIdx int) {
replicas := *replicationFactor
if replicas <= 0 {
replicas = 1
}
sns := snb.sns
if replicas > len(sns) {
replicas = len(sns)
}
sn.readOnlyCheckerWG.Add(1)
go func() {
defer sn.readOnlyCheckerWG.Done()
sn.readOnlyChecker()
}()
defer sn.readOnlyCheckerWG.Wait()
ticker := time.NewTicker(200 * time.Millisecond)
defer ticker.Stop()
var br bufRows
brLastResetTime := fasttime.UnixTimestamp()
var waitCh <-chan struct{}
mustStop := false
for !mustStop {
sn.brLock.Lock()
bufLen := len(sn.br.buf)
sn.brLock.Unlock()
waitCh = nil
if bufLen > 0 {
// Do not sleep if sn.br.buf isn't empty.
waitCh = closedCh
}
select {
case <-sn.stopCh:
mustStop = true
// Make sure the sn.buf is flushed last time before returning
// in order to send the remaining bits of data.
case <-ticker.C:
case <-waitCh:
}
sn.brLock.Lock()
sn.br, br = br, sn.br
sn.brCond.Broadcast()
sn.brLock.Unlock()
currentTime := fasttime.UnixTimestamp()
if len(br.buf) < cap(br.buf)/4 && currentTime-brLastResetTime > 10 {
// Free up capacity space occupied by br.buf in order to reduce memory usage after spikes.
br.buf = append(br.buf[:0:0], br.buf...)
brLastResetTime = currentTime
}
sn.checkHealth()
if len(br.buf) == 0 {
// Nothing to send.
continue
}
// Send br to replicas storage nodes starting from snIdx.
for !sendBufToReplicasNonblocking(snb, &br, snIdx, replicas) {
t := timerpool.Get(200 * time.Millisecond)
select {
case <-sn.stopCh:
timerpool.Put(t)
return
case <-t.C:
timerpool.Put(t)
sn.checkHealth()
}
}
br.reset()
}
}
// 同时,如果有多副本,那么vminsert将数据hash到多个不同的vmstorage节点
func sendBufToReplicasNonblocking(snb *storageNodesBucket, br *bufRows, snIdx, replicas int) bool {
usedStorageNodes := make(map[*storageNode]struct{}, replicas)
sns := snb.sns
for i := 0; i < replicas; i++ {
idx := snIdx + i
attempts := 0
for {
attempts++
if attempts > len(sns) {
if i == 0 {
// The data wasn't replicated at all.
cannotReplicateLogger.Warnf("cannot push %d bytes with %d rows to storage nodes, since all the nodes are temporarily unavailable; "+
"re-trying to send the data soon", len(br.buf), br.rows)
return false
}
// The data is partially replicated, so just emit a warning and return true.
// We could retry sending the data again, but this may result in uncontrolled duplicate data.
// So it is better returning true.
rowsIncompletelyReplicatedTotal.Add(br.rows)
incompleteReplicationLogger.Warnf("cannot make a copy #%d out of %d copies according to -replicationFactor=%d for %d bytes with %d rows, "+
"since a part of storage nodes is temporarily unavailable", i+1, replicas, *replicationFactor, len(br.buf), br.rows)
return true
}
if idx >= len(sns) {
idx %= len(sns)
}
sn := sns[idx]
idx++
if _, ok := usedStorageNodes[sn]; ok {
// The br has been already replicated to sn. Skip it.
continue
}
if !sn.sendBufRowsNonblocking(br) {
// Cannot send data to sn. Go to the next sn.
continue
}
// Successfully sent data to sn.
usedStorageNodes[sn] = struct{}{}
break
}
}
return true
}
vminsert默认情况下会将同一series的sample写入到同一个storage,多副本的时候则hash到多个storage
2、目标分组:vmagent的哈希选择对象
vmagent分组方式,通过vmagent传入的4个参数标识集、组成员总数、当前成员编号、副本数 ,进行判断,是否为当前vmagent要抓取的target:
// 获取需要抓取的对象
func (swc *scrapeWorkConfig) getScrapeWork(target string, extraLabels, metaLabels *promutils.Labels) (*ScrapeWork, error) {
labels := promutils.GetLabels()
defer promutils.PutLabels(labels)
mergeLabels(labels, swc, target, extraLabels, metaLabels)
var originalLabels *promutils.Labels
if !*dropOriginalLabels {
originalLabels = labels.Clone()
}
labels.Labels = swc.relabelConfigs.Apply(labels.Labels, 0)
// Remove labels starting from "__meta_" prefix according to https://www.robustperception.io/life-of-a-label/
labels.RemoveMetaLabels()
// Verify whether the scrape work must be skipped because of `-promscrape.cluster.*` configs.
// Perform the verification on labels after the relabeling in order to guarantee that targets with the same set of labels
// go to the same vmagent shard.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1687#issuecomment-940629495
// 当集群成员数量大于1时,此时需要每个成员按照自己的id去选择需要采集的对象
if *clusterMembersCount > 1 {
bb := scrapeWorkKeyBufPool.Get()
bb.B = appendScrapeWorkKey(bb.B[:0], labels)
// 这里needSkipScrapeWork用于判断是否属于自己需要采集的
needSkip := needSkipScrapeWork(bytesutil.ToUnsafeString(bb.B), *clusterMembersCount, *clusterReplicationFactor, clusterMemberID)
scrapeWorkKeyBufPool.Put(bb)
if needSkip {
return nil, nil
}
}
if !*dropOriginalLabels {
originalLabels.Sort()
// Reduce memory usage by interning all the strings in originalLabels.
originalLabels.InternStrings()
}
if labels.Len() == 0 {
// Drop target without labels.
droppedTargetsMap.Register(originalLabels, swc.relabelConfigs)
return nil, nil
}
scrapeURL, address := promrelabel.GetScrapeURL(labels, swc.params)
if scrapeURL == "" {
// Drop target without URL.
droppedTargetsMap.Register(originalLabels, swc.relabelConfigs)
return nil, nil
}
if _, err := url.Parse(scrapeURL); err != nil {
return nil, fmt.Errorf("invalid target url=%q for job=%q: %w", scrapeURL, swc.jobName, err)
}
var at *auth.Token
tenantID := labels.Get("__tenant_id__")
if len(tenantID) > 0 {
newToken, err := auth.NewToken(tenantID)
if err != nil {
return nil, fmt.Errorf("cannot parse __tenant_id__=%q for job=%q: %w", tenantID, swc.jobName, err)
}
at = newToken
}
// Read __scrape_interval__ and __scrape_timeout__ from labels.
scrapeInterval := swc.scrapeInterval
if s := labels.Get("__scrape_interval__"); len(s) > 0 {
d, err := promutils.ParseDuration(s)
if err != nil {
return nil, fmt.Errorf("cannot parse __scrape_interval__=%q: %w", s, err)
}
scrapeInterval = d
}
scrapeTimeout := swc.scrapeTimeout
if s := labels.Get("__scrape_timeout__"); len(s) > 0 {
d, err := promutils.ParseDuration(s)
if err != nil {
return nil, fmt.Errorf("cannot parse __scrape_timeout__=%q: %w", s, err)
}
scrapeTimeout = d
}
// Read series_limit option from __series_limit__ label.
// See https://docs.victoriametrics.com/vmagent.html#cardinality-limiter
seriesLimit := swc.seriesLimit
if s := labels.Get("__series_limit__"); len(s) > 0 {
n, err := strconv.Atoi(s)
if err != nil {
return nil, fmt.Errorf("cannot parse __series_limit__=%q: %w", s, err)
}
seriesLimit = n
}
// Read stream_parse option from __stream_parse__ label.
// See https://docs.victoriametrics.com/vmagent.html#stream-parsing-mode
streamParse := swc.streamParse
if s := labels.Get("__stream_parse__"); len(s) > 0 {
b, err := strconv.ParseBool(s)
if err != nil {
return nil, fmt.Errorf("cannot parse __stream_parse__=%q: %w", s, err)
}
streamParse = b
}
// Remove labels with "__" prefix according to https://www.robustperception.io/life-of-a-label/
labels.RemoveLabelsWithDoubleUnderscorePrefix()
// Add missing "instance" label according to https://www.robustperception.io/life-of-a-label
if labels.Get("instance") == "" {
labels.Add("instance", address)
}
// Remove references to deleted labels, so GC could clean strings for label name and label value past len(labels.Labels).
// This should reduce memory usage when relabeling creates big number of temporary labels with long names and/or values.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/825 for details.
labelsCopy := labels.Clone()
// Sort labels in alphabetical order of their names.
labelsCopy.Sort()
// Reduce memory usage by interning all the strings in labels.
labelsCopy.InternStrings()
sw := &ScrapeWork{
ScrapeURL: scrapeURL,
ScrapeInterval: scrapeInterval,
ScrapeTimeout: scrapeTimeout,
HonorLabels: swc.honorLabels,
HonorTimestamps: swc.honorTimestamps,
DenyRedirects: swc.denyRedirects,
OriginalLabels: originalLabels,
Labels: labelsCopy,
ExternalLabels: swc.externalLabels,
ProxyURL: swc.proxyURL,
ProxyAuthConfig: swc.proxyAuthConfig,
AuthConfig: swc.authConfig,
RelabelConfigs: swc.relabelConfigs,
MetricRelabelConfigs: swc.metricRelabelConfigs,
SampleLimit: swc.sampleLimit,
DisableCompression: swc.disableCompression,
DisableKeepAlive: swc.disableKeepAlive,
StreamParse: streamParse,
ScrapeAlignInterval: swc.scrapeAlignInterval,
ScrapeOffset: swc.scrapeOffset,
SeriesLimit: seriesLimit,
NoStaleMarkers: swc.noStaleMarkers,
AuthToken: at,
jobNameOriginal: swc.jobName,
}
return sw, nil
}
// 判断是否是属于自己采集的,根据id来决定是否是自己需要采集的,会对key进行hash,此时不管目标列表顺序是否一致都没关系,
// 相同对象的key值一样,hash出来的也都一样,然后就是还有根据副本数量,然后同样的用id去判断自己需要采集多少副本
func needSkipScrapeWork(key string, membersCount, replicasCount, memberNum int) bool {
if membersCount <= 1 {
return false
}
h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
idx := int(h % uint64(membersCount))
if replicasCount < 1 {
replicasCount = 1
}
for i := 0; i < replicasCount; i++ {
if idx == memberNum {
return false
}
idx++
if idx >= membersCount {
idx = 0
}
}
return true
}
注意:如果多个vmagent如果从sd那边拿到的对象顺序不一致也没关系,采用对key做hash,再跟成员数量求余的方式来判断的
注意点:
- vminsert默认情况下会将同一series的sample写入到同一个storage,多副本的时候则hash到多个storage
- 如果多个vmagent如果从sd那边拿到的对象顺序不一致也没关系,采用对key做hash,再跟成员数量求余的方式来判断的

DAMO开发者矩阵,由阿里巴巴达摩院和中国互联网协会联合发起,致力于探讨最前沿的技术趋势与应用成果,搭建高质量的交流与分享平台,推动技术创新与产业应用链接,围绕“人工智能与新型计算”构建开放共享的开发者生态。
更多推荐
所有评论(0)