@@ -25,6 +25,7 @@ import (
25
25
"github.com/influxdata/influxdb/v2/models"
26
26
"github.com/influxdata/influxdb/v2/pkg/deep"
27
27
"github.com/influxdata/influxdb/v2/pkg/slices"
28
+ "github.com/influxdata/influxdb/v2/pkg/snowflake"
28
29
"github.com/influxdata/influxdb/v2/predicate"
29
30
"github.com/influxdata/influxdb/v2/tsdb"
30
31
"github.com/influxdata/influxql"
@@ -886,6 +887,122 @@ func TestStore_FlushWALOnClose(t *testing.T) {
886
887
}
887
888
}
888
889
890
+ func TestStore_WRRSegments (t * testing.T ) {
891
+ // Check if uncommitted WRR segments are identified and cause opening the store to abort.
892
+ for _ , index := range tsdb .RegisteredIndexes () {
893
+ t .Run ("TestStore_WRRSegments_" + index , func (t * testing.T ) {
894
+ idGen := snowflake .New (0 )
895
+ generateWRRSegmentName := func () string {
896
+ return fmt .Sprintf ("_v01_%020d.wrr" , idGen .Next ())
897
+ }
898
+ createFile := func (t * testing.T , fn string ) {
899
+ t .Helper ()
900
+ require .NoError (t , os .WriteFile (fn , nil , 0666 ))
901
+ }
902
+ createWRR := func (t * testing.T , path string ) string {
903
+ t .Helper ()
904
+ fn := filepath .Join (path , generateWRRSegmentName ())
905
+ createFile (t , fn )
906
+ return fn
907
+ }
908
+ generateWRRSnapshotName := func () string {
909
+ return generateWRRSegmentName () + ".snapshot"
910
+ }
911
+ createWRRSnapshot := func (t * testing.T , path string ) string {
912
+ t .Helper ()
913
+ fn := filepath .Join (path , generateWRRSnapshotName ())
914
+ createFile (t , fn )
915
+ return fn
916
+ }
917
+ checkWRRError := func (t * testing.T , err error , wrrs ... []string ) {
918
+ t .Helper ()
919
+ require .ErrorIs (t , err , tsdb .ErrIncompatibleWAL )
920
+ require .ErrorContains (t , err , "incompatible WAL format: uncommitted WRR files found" )
921
+ // We don't know the exact order of the errors if there are multiple shards with
922
+ // uncommitted WRRs, but this will insure that all of them are included in the error
923
+ // message.
924
+ for _ , w := range wrrs {
925
+ if len (w ) > 0 {
926
+ require .ErrorContains (t , err , fmt .Sprintf ("%v" , w ))
927
+ }
928
+ }
929
+ }
930
+
931
+ s := MustOpenStore (t , index , WithWALFlushOnShutdown (true ))
932
+ defer s .Close ()
933
+
934
+ // Create shard #0 with data.
935
+ s .MustCreateShardWithData ("db0" , "rp0" , 0 ,
936
+ `cpu,host=serverA value=1 0` ,
937
+ `cpu,host=serverA value=2 10` ,
938
+ `cpu,host=serverB value=3 20` ,
939
+ )
940
+
941
+ // Create shard #1 with data.
942
+ s .MustCreateShardWithData ("db0" , "rp0" , 1 ,
943
+ `cpu,host=serverA value=1 30` ,
944
+ `cpu,host=serverC value=3 60` ,
945
+ )
946
+
947
+ sh0WALPath := filepath .Join (s .walPath , "db0" , "rp0" , "0" )
948
+ require .DirExists (t , sh0WALPath )
949
+ sh1WALPath := filepath .Join (s .walPath , "db0" , "rp0" , "1" )
950
+ require .DirExists (t , sh1WALPath )
951
+
952
+ // No WRR segments, no error
953
+ require .NoError (t , s .Reopen (t ))
954
+
955
+ // 1 uncommitted WRR segment in shard 0
956
+ var sh0Uncommitted , sh1Uncommitted []string
957
+ checkReopen := func (t * testing.T ) {
958
+ t .Helper ()
959
+ allUncommitted := [][]string {sh0Uncommitted , sh1Uncommitted }
960
+ var hasUncommitted bool
961
+ for _ , u := range allUncommitted {
962
+ if len (u ) > 0 {
963
+ hasUncommitted = true
964
+ }
965
+ }
966
+
967
+ if hasUncommitted {
968
+ checkWRRError (t , s .Reopen (t ), allUncommitted ... )
969
+ } else {
970
+ require .NoError (t , s .Reopen (t ))
971
+ }
972
+ }
973
+ sh0Uncommitted = append (sh0Uncommitted , createWRR (t , sh0WALPath ))
974
+ checkReopen (t )
975
+
976
+ // 2 uncommitted WRR segments in shard 0
977
+ sh0Uncommitted = append (sh0Uncommitted , createWRR (t , sh0WALPath ))
978
+ checkReopen (t )
979
+
980
+ // 2 uncommitted WR segments in shard 0, 1 in shard 1
981
+ sh1Uncommitted = append (sh1Uncommitted , createWRR (t , sh1WALPath ))
982
+ checkReopen (t )
983
+
984
+ // No uncommitted WRR in shard 0, 1 in shard 1
985
+ createWRRSnapshot (t , sh0WALPath )
986
+ sh0Uncommitted = nil
987
+ checkReopen (t )
988
+
989
+ // No uncommitted WRR segments
990
+ createWRRSnapshot (t , sh1WALPath )
991
+ sh1Uncommitted = nil
992
+ checkReopen (t )
993
+
994
+ // Add 1 uncommitted to shard 1
995
+ sh1Uncommitted = append (sh1Uncommitted , createWRR (t , sh1WALPath ))
996
+ checkReopen (t )
997
+
998
+ // No uncommitted WRR segments
999
+ createWRRSnapshot (t , sh1WALPath )
1000
+ sh1Uncommitted = nil
1001
+ checkReopen (t )
1002
+ })
1003
+ }
1004
+ }
1005
+
889
1006
// Test new reader blocking.
890
1007
func TestStore_NewReadersBlocked (t * testing.T ) {
891
1008
//t.Parallel()
0 commit comments