@@ -11,84 +11,160 @@ import (
11
11
"sync"
12
12
13
13
"github.com/NVIDIA/aistore/cmn"
14
+ "github.com/NVIDIA/aistore/cmn/cos"
14
15
"github.com/NVIDIA/aistore/cmn/debug"
15
16
"github.com/NVIDIA/aistore/cmn/nlog"
16
17
"github.com/NVIDIA/aistore/core"
17
18
"github.com/NVIDIA/aistore/transport"
18
19
"github.com/NVIDIA/aistore/xact"
19
20
)
20
21
21
- type SharedDM struct {
22
- dm DM
23
- rxcbs map [string ]transport.RecvObj
24
- rxmu sync.Mutex
25
- openmu sync.Mutex
22
+ // [TODO]
23
+ // - Close() vs usage (when len(rxcbs) > 0); provide xctn.onFinished() => UnregRecv
24
+ // - limitation: hdr.Opaque is exclusively reserved xaction ID
25
+
26
+ type sharedDM struct {
27
+ dm DM
28
+ rxcbs map [string ]transport.RecvObj
29
+ ocmu sync.Mutex
30
+ rxmu sync.Mutex
26
31
}
27
32
33
+ // global
34
+ var SDM sharedDM
35
+
28
36
// called upon target startup
29
- func ( s * SharedDM ) Init (config * cmn.Config , compression string ) {
37
+ func InitSDM (config * cmn.Config , compression string ) {
30
38
extra := Extra {Config : config , Compression : compression }
31
- s .dm .init (s .trname (), s .recv , cmn .OwtNone , extra )
39
+ SDM .dm .init (SDM .trname (), SDM .recv , cmn .OwtNone , extra )
32
40
}
33
41
34
- // NOTE: constant (until and unless we run multiple shared-DMs)
35
- func (* SharedDM ) trname () string { return "shared-dm" }
42
+ func (sdm * sharedDM ) IsOpen () bool { return sdm .dm .stage .opened .Load () }
43
+
44
+ // constant (until and unless we run multiple shared-DMs)
45
+ func (* sharedDM ) trname () string { return "shared-dm" }
46
+
47
+ func (sdm * sharedDM ) _already () {
48
+ nlog .WarningDepth (2 , core .T .String (), sdm .trname (), "is already open" )
49
+ }
36
50
37
51
// called on-demand
38
- func (s * SharedDM ) Open () {
39
- s .openmu .Lock ()
40
- defer s .openmu .Unlock ()
41
- if s .dm .stage .opened .Load () {
42
- return
52
+ func (sdm * sharedDM ) Open () error {
53
+ if sdm .IsOpen () {
54
+ sdm ._already ()
55
+ return nil
43
56
}
44
- s .rxcbs = make (map [string ]transport.RecvObj , 4 )
45
- s .dm .Open ()
46
- nlog .InfoDepth (1 , core .T .String (), "open" , s .trname ())
57
+
58
+ sdm .ocmu .Lock ()
59
+ if sdm .IsOpen () {
60
+ sdm .ocmu .Unlock ()
61
+ sdm ._already ()
62
+ return nil
63
+ }
64
+
65
+ sdm .rxmu .Lock ()
66
+ sdm .rxcbs = make (map [string ]transport.RecvObj , 4 )
67
+ sdm .rxmu .Unlock ()
68
+
69
+ if err := sdm .dm .RegRecv (); err != nil {
70
+ sdm .ocmu .Unlock ()
71
+ nlog .ErrorDepth (1 , core .T .String (), err )
72
+ debug .AssertNoErr (err )
73
+ return err
74
+ }
75
+ sdm .dm .Open ()
76
+ sdm .ocmu .Unlock ()
77
+
78
+ nlog .InfoDepth (1 , core .T .String (), "open" , sdm .trname ())
79
+ return nil
47
80
}
48
81
49
82
// nothing running + 10m inactivity
50
- func (s * SharedDM ) Close () error {
51
- s .openmu .Lock ()
52
- defer s .openmu .Unlock ()
53
- if ! s .dm .stage .opened .Load () {
83
+ func (sdm * sharedDM ) Close () error {
84
+ if ! sdm .IsOpen () {
85
+ return nil
86
+ }
87
+ sdm .ocmu .Lock ()
88
+ if ! sdm .IsOpen () {
89
+ sdm .ocmu .Unlock ()
54
90
return nil
55
91
}
56
- if len (s .rxcbs ) > 0 {
57
- return fmt .Errorf ("cannot close %s: %v" , s .trname (), s .rxcbs ) // TODO -- FIXME: cleanup
92
+
93
+ var (
94
+ xid string
95
+ l int
96
+ )
97
+ sdm .rxmu .Lock ()
98
+ for xid = range sdm .rxcbs {
99
+ break
100
+ }
101
+ l = len (sdm .rxcbs )
102
+
103
+ if l > 0 {
104
+ sdm .rxmu .Unlock ()
105
+ debug .Assert (cos .IsValidUUID (xid ), xid )
106
+ sdm .ocmu .Unlock ()
107
+ return fmt .Errorf ("cannot close %s: [%s, %d]" , sdm .trname (), xid , l )
58
108
}
59
- s .rxcbs = nil
60
- s .dm .Close (nil )
61
- nlog .InfoDepth (1 , core .T .String (), "close" , s .trname ())
109
+ sdm .rxcbs = nil
110
+ sdm .rxmu .Unlock ()
111
+
112
+ sdm .dm .Close (nil )
113
+ sdm .dm .UnregRecv ()
114
+ sdm .ocmu .Unlock ()
115
+
116
+ nlog .InfoDepth (1 , core .T .String (), "close" , sdm .trname ())
62
117
return nil
63
118
}
64
119
65
- func (s * SharedDM ) RegRecv (xid string , cb transport.RecvObj ) {
66
- s .rxmu .Lock ()
67
- debug .Assert (s .rxcbs [xid ] == nil )
68
- s .rxcbs [xid ] = cb
69
- s .rxmu .Unlock ()
120
+ func (sdm * sharedDM ) RegRecv (xid string , cb transport.RecvObj ) {
121
+ sdm .ocmu .Lock ()
122
+ sdm .rxmu .Lock ()
123
+ if ! sdm .IsOpen () {
124
+ sdm .rxmu .Unlock ()
125
+ sdm .ocmu .Unlock ()
126
+ debug .Assert (false , sdm .trname (), " " , "closed" )
127
+ return
128
+ }
129
+ debug .Assert (sdm .rxcbs [xid ] == nil )
130
+ sdm .rxcbs [xid ] = cb
131
+ sdm .rxmu .Unlock ()
132
+ sdm .ocmu .Unlock ()
70
133
}
71
134
72
- func (s * SharedDM ) UnregRecv (xid string ) {
73
- s .rxmu .Lock ()
74
- delete (s .rxcbs , xid )
75
- s .rxmu .Unlock ()
135
+ func (sdm * sharedDM ) UnregRecv (xid string ) {
136
+ sdm .ocmu .Lock ()
137
+ sdm .rxmu .Lock ()
138
+ if ! sdm .IsOpen () {
139
+ sdm .rxmu .Unlock ()
140
+ sdm .ocmu .Unlock ()
141
+ debug .Assert (false , sdm .trname (), " " , "closed" )
142
+ return
143
+ }
144
+ delete (sdm .rxcbs , xid )
145
+ sdm .rxmu .Unlock ()
146
+ sdm .ocmu .Unlock ()
76
147
}
77
148
78
- // NOTE (and limitation): use hdr.Opaque exclusively for xaction IDs
79
- func (s * SharedDM ) recv (hdr * transport.ObjHdr , r io.Reader , err error ) error {
149
+ func (sdm * sharedDM ) recv (hdr * transport.ObjHdr , r io.Reader , err error ) error {
80
150
if err != nil {
81
151
return err
82
152
}
83
153
xid := string (hdr .Opaque )
84
154
if err := xact .CheckValidUUID (xid ); err != nil {
85
- return fmt .Errorf ("%s: %v" , s .trname (), err )
155
+ return fmt .Errorf ("%s: %v" , sdm .trname (), err )
86
156
}
87
- s .rxmu .Lock ()
88
- cb , ok := s .rxcbs [xid ]
89
- s .rxmu .Unlock ()
157
+
158
+ sdm .rxmu .Lock ()
159
+ if ! sdm .IsOpen () {
160
+ sdm .rxmu .Unlock ()
161
+ return fmt .Errorf ("%s is closed, dropping recv [xid: %s, oname: %s]" , sdm .trname (), xid , hdr .ObjName )
162
+ }
163
+ cb , ok := sdm .rxcbs [xid ]
164
+ sdm .rxmu .Unlock ()
165
+
90
166
if ! ok {
91
- return fmt .Errorf ("%s: no registered handler for xact %q " , s .trname (), xid )
167
+ return fmt .Errorf ("%s: xid %s not found, dropping recv [oname: %s] " , sdm .trname (), xid , hdr . ObjName )
92
168
}
93
169
return cb (hdr , r , nil )
94
170
}
0 commit comments