Skip to content

Conversation

icey129
Copy link
Contributor

@icey129 icey129 commented Apr 27, 2022

What is the purpose of the change

Fix DATA RACE when running the executable file generated by go build -race

Brief changelog

send csListLock *sync.Mutex to computeStatsData for lock csList

Verifying this change

How to reproduce

Run the following test with go test -race

func TestNewStatsManager(t *testing.T) {
	stats := NewStatsManager()

	st := time.Now()
	for  {
		stats.increasePullTPS("rocketmq", "default", 1)
		time.Sleep(500*time.Millisecond)
		if time.Now().Sub(st) > 5*time.Minute {
			break
		}
	}
	stats.ShutDownStat()
}

The DATA RACE will print on terminal

WARNING: DATA RACE
Read at 0x00c000538028 by goroutine 230:
  container/list.(*List).Len()
      /usr/local/go1.17.3/src/container/list/list.go:66 +0xc7
  github.com/apache/rocketmq-client-go/v2/consumer.computeStatsData()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:143 +0xd5
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItem).printAtMinutes()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:426 +0x5e
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).printAtMinutes.func1()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:285 +0x47
  sync.(*Map).Range()
      /usr/local/go1.17.3/src/sync/map.go:346 +0x205
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).printAtMinutes()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:283 +0x3b
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func4()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:224 +0x112
  github.com/apache/rocketmq-client-go/v2/primitive.WithRecover()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/primitive/base.go:100 +0x52
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init·dwrap·21()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:215 +0x39

Previous write at 0x00c000538028 by goroutine 227:
  container/list.(*List).insert()
      /usr/local/go1.17.3/src/container/list/list.go:98 +0x518
  container/list.(*List).insertValue()
      /usr/local/go1.17.3/src/container/list/list.go:104 +0x1bf
  container/list.(*List).PushBack()
      /usr/local/go1.17.3/src/container/list/list.go:155 +0x27e
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItem).samplingInSeconds()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:389 +0x145
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).samplingInSeconds.func1()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:261 +0x47
  sync.(*Map).Range()
      /usr/local/go1.17.3/src/sync/map.go:346 +0x205
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).samplingInSeconds()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:259 +0x3b
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init.func1()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:184 +0xc6
  github.com/apache/rocketmq-client-go/v2/primitive.WithRecover()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/primitive/base.go:100 +0x52
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init·dwrap·18()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:176 +0x39

Goroutine 230 (running) created at:
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:215 +0x3d0
  github.com/apache/rocketmq-client-go/v2/consumer.newStatsItemSet()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:171 +0x51c
  github.com/apache/rocketmq-client-go/v2/consumer.NewStatsManager()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:46 +0x51d
  github.com/apache/rocketmq-client-go/v2/consumer.TestNewStatsManager()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics_test.go:217 +0x29
  testing.tRunner()
      /usr/local/go1.17.3/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/go1.17.3/src/testing/testing.go:1306 +0x47

Goroutine 227 (running) created at:
  github.com/apache/rocketmq-client-go/v2/consumer.(*statsItemSet).init()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:176 +0x12b
  github.com/apache/rocketmq-client-go/v2/consumer.newStatsItemSet()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:171 +0x51c
  github.com/apache/rocketmq-client-go/v2/consumer.NewStatsManager()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics.go:46 +0x51d
  github.com/apache/rocketmq-client-go/v2/consumer.TestNewStatsManager()
      /home/icey/lmm/v9-git/miot-services/lmm/rocketmq-client-go/consumer/statistics_test.go:217 +0x29
  testing.tRunner()
      /usr/local/go1.17.3/src/testing/testing.go:1259 +0x22f
  testing.(*T).Run·dwrap·21()
      /usr/local/go1.17.3/src/testing/testing.go:1306 +0x47

When run statsItemSet init() will start several goroutines to run sis.samplingInSeconds() and sis.printAtMinutes().
The samplingInSeconds() func will write csListMinute

func (si *statsItem) samplingInSeconds() {
	si.csListMinuteLock.Lock()
	defer si.csListMinuteLock.Unlock()
	si.csListMinute.PushBack(callSnapshot{
		timestamp: time.Now().Unix() * 1000,
		time:      atomic.LoadInt64(&si.times),
		value:     atomic.LoadInt64(&si.value),
	})
	if si.csListMinute.Len() > 7 {
		si.csListMinute.Remove(si.csListMinute.Front())
	}
}

While printAtMinutes() func call computeStatsData() will read csListMinute

func (si *statsItem) printAtMinutes() {
	ss := computeStatsData(si.csListMinute)
	rlog.Info("Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f", map[string]interface{}{
		"statsName": si.statsName,
		"statsKey":  si.statsKey,
		"SUM":       ss.sum,
		"TPS":       fmt.Sprintf("%.2f", ss.tps),
		"AVGPT":     ss.avgpt,
	})
}

var csListLock sync.Mutex

func computeStatsData(csList *list.List) statsSnapshot {
	csListLock.Lock()
	defer csListLock.Unlock()
	tps, avgpt, sum := 0.0, 0.0, int64(0)
	if csList.Len() > 0 {
		first := csList.Front().Value.(callSnapshot)
		last := csList.Back().Value.(callSnapshot)
		sum = last.value - first.value
		tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
		timesDiff := last.time - first.time
		if timesDiff > 0 {
			avgpt = float64(sum*1.0) / float64(timesDiff)
		}
	}
	return statsSnapshot{
		tps:   tps,
		avgpt: avgpt,
		sum:   sum,
	}
}

In samplingInSeconds() use csListMinuteLock while computeStatsData() use a global Mutex to lock. So read and write in different goroutine will cause DATA RACE.

How to fix

Change the computeStatsData() func parameters as following will fix the DATA RACE

func computeStatsData(csListLock *sync.Mutex, csList *list.List) statsSnapshot {
	csListLock.Lock()
	defer csListLock.Unlock()
	tps, avgpt, sum := 0.0, 0.0, int64(0)
	if csList.Len() > 0 {
		first := csList.Front().Value.(callSnapshot)
		last := csList.Back().Value.(callSnapshot)
		sum = last.value - first.value
		tps = float64(sum*1000.0) / float64(last.timestamp-first.timestamp)
		timesDiff := last.time - first.time
		if timesDiff > 0 {
			avgpt = float64(sum*1.0) / float64(timesDiff)
		}
	}
	return statsSnapshot{
		tps:   tps,
		avgpt: avgpt,
		sum:   sum,
	}
}

the callers change to

func (si *statsItem) printAtMinutes() {
	ss := computeStatsData(&si.csListMinuteLock, si.csListMinute)
	rlog.Info("Stats In One Minute, SUM: %d TPS:  AVGPT: %.2f", map[string]interface{}{
		"statsName": si.statsName,
		"statsKey":  si.statsKey,
		"SUM":       ss.sum,
		"TPS":       fmt.Sprintf("%.2f", ss.tps),
		"AVGPT":     ss.avgpt,
	})
}

When changed the code, run the previous test won't have DATA RACE

Follow this checklist to help us incorporate your contribution quickly and easily. Notice, it would be helpful if you could finish the following 5 checklist(the last one is not necessary)before request the community to review your PR.

  • [√] Make sure there is a Github issue filed for the change (usually before you start working on it). Trivial changes like typos do not require a Github issue. Your pull request should address just this issue, without pulling in other changes - one PR resolves one issue.
  • [√] Format the pull request title like [ISSUE #123] Fix UnknownException when host config not exist. Each commit in the pull request should have a meaningful subject line and body.
  • [√] Write a pull request description that is detailed enough to understand what the pull request does, how, and why.
  • [√] Write necessary unit-test(over 80% coverage) to verify your logic correction, more mock a little better when a cross-module dependency exists.
  • If this contribution is large, please file an Apache Individual Contributor License Agreement.

@ShannonDing ShannonDing requested a review from wenfengwang June 15, 2022 02:01
@ShannonDing ShannonDing added the enhancement New feature or request label Jun 15, 2022
@ShannonDing ShannonDing added this to the 2.1.1 milestone Jul 21, 2022
Copy link
Member

@ShannonDing ShannonDing left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ShannonDing ShannonDing merged commit a7b2db6 into apache:master Jul 21, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

sis.samplingInSeconds and sis.printAtMinutes() running in different goroutines cause DATA RACE
2 participants