Skip to content

Conversation

neilalexander
Copy link
Member

This extends Maurice's work in #6973 by adding source tracking in the Nats-Counter-Sources header. This allows us to correctly rewrite Nats-Incr when a source counter that's non-zero is added, as well as correctly identifying and reconciling gaps due to the source being removed or unavailable, or when there are lost messages etc, providing eventual consistency with sourced counters.

The additional unit test proves the functionality even with source streams that have MaxMsgsPerSubject of 1, as well as a combination of stream-local and sourced increments, with full header checking.

Signed-off-by: Neil Twigg neil@nats.io

Signed-off-by: Neil Twigg <neil@nats.io>
Copy link
Member

@MauriceVanVeen MauriceVanVeen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@neilalexander
Copy link
Member Author

On my machine:

goos: darwin
goarch: arm64
pkg: github.com/nats-io/nats-server/v2/server
cpu: Apple M2 Ultra
BenchmarkJetStreamCounters
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Sync-24    	                   28768	     71495 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Async[W:1000]-24         	  213921	      5028 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Async[W:4000]-24         	  220041	      5415 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=0/Async[W:8000]-24         	  221127	      5295 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Sync-24                 	   13412	     84857 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Async[W:1000]-24        	   72734	     15083 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Async[W:4000]-24        	   82062	     14873 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=10/Async[W:8000]-24        	   72955	     14699 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Sync-24                 	   19731	     88666 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Async[W:1000]-24        	   50818	     20577 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Async[W:4000]-24        	   54319	     19708 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=25/Async[W:8000]-24        	   55093	     19810 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Sync-24                	    6962	    160027 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Async[W:1000]-24       	   10485	    112324 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Async[W:4000]-24       	   10563	    111076 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1,Srcs=250/Async[W:8000]-24       	   10473	    110473 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Sync-24               	   16050	     72490 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Async[W:1000]-24      	  219138	      5320 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Async[W:4000]-24      	  239312	      5577 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=0/Async[W:8000]-24      	  207524	      5156 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Sync-24              	   22423	     84032 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Async[W:1000]-24     	   83686	     14726 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Async[W:4000]-24     	   80600	     15110 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=10/Async[W:8000]-24     	   84800	     15018 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Sync-24              	   20132	     90216 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Async[W:1000]-24     	   62925	     19739 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Async[W:4000]-24     	   61057	     19520 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=25/Async[W:8000]-24     	   63314	     20131 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Sync-24             	    7173	    176151 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Async[W:1000]-24    	   10000	    105770 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Async[W:4000]-24    	   10000	    102755 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=1,Subjs=1000,Srcs=250/Async[W:8000]-24    	   10000	    101467 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Sync-24                  	    7527	    158223 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Async[W:1000]-24         	   88440	     12644 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Async[W:4000]-24         	  104139	     11985 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=0/Async[W:8000]-24         	  108655	     11320 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Sync-24                 	    6840	    168719 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Async[W:1000]-24        	   64184	     18989 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Async[W:4000]-24        	   64358	     17617 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=10/Async[W:8000]-24        	   79933	     17136 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Sync-24                 	    6754	    176637 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Async[W:1000]-24        	   50246	     23201 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Async[W:4000]-24        	   59416	     20686 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=25/Async[W:8000]-24        	   68030	     21056 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Sync-24                	    3973	    293268 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Async[W:1000]-24       	   15230	     77848 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Async[W:4000]-24       	   18342	     67962 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1,Srcs=250/Async[W:8000]-24       	   26856	     63219 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Sync-24               	    7431	    159143 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Async[W:1000]-24      	   84061	     14032 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Async[W:4000]-24      	  107709	     12725 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=0/Async[W:8000]-24      	   95563	     12083 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Sync-24              	    7088	    167981 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Async[W:1000]-24     	   45506	     24990 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Async[W:4000]-24     	   61438	     21342 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=10/Async[W:8000]-24     	   64424	     19236 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Sync-24              	    6838	    174968 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Async[W:1000]-24     	   38289	     32684 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Async[W:4000]-24     	   48086	     26198 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=25/Async[W:8000]-24     	   52893	     24143 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Sync
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Sync-24             	    5073	    272391 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Async[W:1000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Async[W:1000]-24    	   10000	    108562 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Async[W:4000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Async[W:4000]-24    	   14994	     84980 ns/op	         0 %error
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Async[W:8000]
BenchmarkJetStreamCounters/S=File,N=3,R=3,Subjs=1000,Srcs=250/Async[W:8000]-24    	   17977	     76557 ns/op	         0 %error

@neilalexander neilalexander marked this pull request as ready for review June 23, 2025 14:21
@neilalexander neilalexander requested a review from a team as a code owner June 23, 2025 14:21
@neilalexander neilalexander merged commit 7781dce into maurice/counter-crdt Jun 25, 2025
66 of 67 checks passed
@neilalexander neilalexander deleted the neil/counter-crdt-sourcing branch June 25, 2025 14:43
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants