Skip to content

Commit f8ee6c2

Browse files
committed
core: add shared (multi-use, multi-job) data mover ("shared streams")
* shared-DM code complete - x-moss remains * part four, prev. commit: c43a7b4 Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent a13950c commit f8ee6c2

File tree

5 files changed

+151
-61
lines changed

5 files changed

+151
-61
lines changed

ais/htrun.go

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2042,7 +2042,7 @@ func (h *htrun) regTo(url string, psi *meta.Snode, tout time.Duration, htext hte
20422042
}
20432043

20442044
// (fast path: nodes => primary)
2045-
func (h *htrun) fastKalive(smap *smapX, timeout time.Duration, ecActive bool) (string /*pid*/, http.Header, error) {
2045+
func (h *htrun) fastKalive(smap *smapX, timeout time.Duration, ecActive, dmActive bool) (string /*pid*/, http.Header, error) {
20462046
if nlog.Stopping() {
20472047
return "", http.Header{}, h.errStopping()
20482048
}
@@ -2056,10 +2056,15 @@ func (h *htrun) fastKalive(smap *smapX, timeout time.Duration, ecActive bool) (s
20562056
cargs.req = cmn.HreqArgs{Method: http.MethodPost, Base: primaryURL, Path: apc.URLPathCluKalive.Join(h.SID())}
20572057
cargs.timeout = timeout
20582058
}
2059-
if ecActive {
2059+
if ecActive || dmActive {
20602060
// (target => primary)
20612061
hdr := make(http.Header, 1)
2062-
hdr.Set(apc.HdrActiveEC, "true")
2062+
if ecActive {
2063+
hdr.Set(apc.HdrActiveEC, "true")
2064+
}
2065+
if dmActive {
2066+
hdr.Set(apc.HdrActiveDM, "true")
2067+
}
20632068
cargs.req.Header = hdr
20642069
}
20652070

ais/kalive.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/NVIDIA/aistore/ec"
2222
"github.com/NVIDIA/aistore/stats"
2323
"github.com/NVIDIA/aistore/sys"
24+
"github.com/NVIDIA/aistore/transport/bundle"
2425
)
2526

2627
const (
@@ -131,17 +132,18 @@ func (tkr *talive) cluUptime(now int64) (elapsed time.Duration) {
131132
}
132133

133134
func (tkr *talive) sendKalive(smap *smapX, timeout time.Duration, _ int64, fast bool) (pid string, status int, err error) {
135+
t := tkr.t
134136
if fast {
135137
// additionally
136-
interrupted, restarted := tkr.t.interruptedRestarted()
138+
interrupted, restarted := t.interruptedRestarted()
137139
fast = !interrupted && !restarted
138140
}
139141
if fast {
140142
debug.Assert(ec.ECM != nil)
141-
pid, _, err = tkr.t.fastKalive(smap, timeout, ec.ECM.IsActive())
143+
pid, _, err = t.fastKalive(smap, timeout, ec.ECM.IsActive(), bundle.SDM.IsOpen())
142144
return pid, 0, err
143145
}
144-
return tkr.t.slowKalive(smap, tkr.t, timeout)
146+
return t.slowKalive(smap, tkr.t, timeout)
145147
}
146148

147149
func (tkr *talive) do(config *cmn.Config) (stopped bool) {
@@ -196,12 +198,15 @@ func (pkr *palive) sendKalive(smap *smapX, timeout time.Duration, now int64, fas
196198
debug.Assert(!smap.isPrimary(pkr.p.si))
197199

198200
if fast {
199-
pid, hdr, err := pkr.p.fastKalive(smap, timeout, false /*ec active*/)
201+
pid, hdr, err := pkr.p.fastKalive(smap, timeout, false, false /*shared streams*/)
200202
if err == nil {
201203
// (shared streams; EC streams)
202204
if pkr.p.ec.isActive(hdr) {
203205
pkr.p.ec.setActive(now)
204206
}
207+
if pkr.p.dm.isActive(hdr) {
208+
pkr.p.dm.setActive(now)
209+
}
205210
}
206211
return pid, 0, err
207212
}

ais/target.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,6 @@ type (
7171
fsprg fsprungroup
7272
reb *reb.Reb
7373
res *res.Res
74-
sdm *bundle.SharedDM
7574
txns txns
7675
regstate regstate
7776
}
@@ -294,10 +293,9 @@ func (t *target) init(config *cmn.Config) {
294293

295294
t.fsprg.init(t, newVol) // subgroup of the daemon.rg rungroup
296295

297-
sc := transport.Init(ts) // init transport sub-system
298-
daemon.rg.add(sc) // new stream collector
299-
t.sdm = &bundle.SharedDM{} // shared streams
300-
t.sdm.Init(config, apc.CompressNever)
296+
sc := transport.Init(ts) // init transport sub-system
297+
daemon.rg.add(sc) // new stream collector
298+
bundle.InitSDM(config, apc.CompressNever) // shared streams
301299

302300
t.fshc = health.NewFSHC(t)
303301

ais/tgtec.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,10 @@ import (
2121
"github.com/NVIDIA/aistore/core/meta"
2222
"github.com/NVIDIA/aistore/ec"
2323
"github.com/NVIDIA/aistore/hk"
24+
"github.com/NVIDIA/aistore/transport/bundle"
2425
"github.com/NVIDIA/aistore/xact/xreg"
2526
)
2627

27-
var errCloseStreams = errors.New("EC is currently active, cannot close streams")
28-
2928
func (t *target) ecHandler(w http.ResponseWriter, r *http.Request) {
3029
switch r.Method {
3130
case http.MethodGet:
@@ -144,17 +143,24 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
144143
return
145144
}
146145
if ec.ECM.IsActive() {
147-
t.writeErr(w, r, errCloseStreams)
146+
t.writeErr(w, r, errors.New("EC is active, cannot close"))
148147
return
149148
}
150149
nlog.Infoln(t.String(), "hk-postpone", action)
151150
hk.Reg(hkname, closeEc, postpone)
152151

153-
// TODO -- FIXME: simplified (compare w/ above); refactor as toggle-streams-...
154152
case apc.ActDmOpen:
155-
t.sdm.Open()
153+
if err := bundle.SDM.Open(); err != nil {
154+
t.writeErr(w, r, err)
155+
}
156156
case apc.ActDmClose:
157-
t.sdm.Close()
157+
if !t.ensureIntraControl(w, r, true /* from primary */) {
158+
return
159+
}
160+
// TODO: consider delaying via hk (see above)
161+
if err := bundle.SDM.Close(); err != nil {
162+
t.writeErr(w, r, err)
163+
}
158164

159165
default:
160166
t.writeErr(w, r, errActEc(action))
@@ -163,7 +169,7 @@ func (t *target) httpecpost(w http.ResponseWriter, r *http.Request) {
163169

164170
func closeEc(int64) time.Duration {
165171
if ec.ECM.IsActive() {
166-
nlog.Warningln("hk-cb:", errCloseStreams)
172+
nlog.Warningln("hk-cb: cannot close EC streams")
167173
} else {
168174
ec.ECM.CloseStreams(false /*with refc*/)
169175
}

transport/bundle/shared_dm.go

Lines changed: 118 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -11,84 +11,160 @@ import (
1111
"sync"
1212

1313
"github.com/NVIDIA/aistore/cmn"
14+
"github.com/NVIDIA/aistore/cmn/cos"
1415
"github.com/NVIDIA/aistore/cmn/debug"
1516
"github.com/NVIDIA/aistore/cmn/nlog"
1617
"github.com/NVIDIA/aistore/core"
1718
"github.com/NVIDIA/aistore/transport"
1819
"github.com/NVIDIA/aistore/xact"
1920
)
2021

21-
type SharedDM struct {
22-
dm DM
23-
rxcbs map[string]transport.RecvObj
24-
rxmu sync.Mutex
25-
openmu sync.Mutex
22+
// [TODO]
23+
// - Close() vs usage (when len(rxcbs) > 0); provide xctn.onFinished() => UnregRecv
24+
// - limitation: hdr.Opaque is exclusively reserved xaction ID
25+
26+
type sharedDM struct {
27+
dm DM
28+
rxcbs map[string]transport.RecvObj
29+
ocmu sync.Mutex
30+
rxmu sync.Mutex
2631
}
2732

33+
// global
34+
var SDM sharedDM
35+
2836
// called upon target startup
29-
func (s *SharedDM) Init(config *cmn.Config, compression string) {
37+
func InitSDM(config *cmn.Config, compression string) {
3038
extra := Extra{Config: config, Compression: compression}
31-
s.dm.init(s.trname(), s.recv, cmn.OwtNone, extra)
39+
SDM.dm.init(SDM.trname(), SDM.recv, cmn.OwtNone, extra)
3240
}
3341

34-
// NOTE: constant (until and unless we run multiple shared-DMs)
35-
func (*SharedDM) trname() string { return "shared-dm" }
42+
func (sdm *sharedDM) IsOpen() bool { return sdm.dm.stage.opened.Load() }
43+
44+
// constant (until and unless we run multiple shared-DMs)
45+
func (*sharedDM) trname() string { return "shared-dm" }
46+
47+
func (sdm *sharedDM) _already() {
48+
nlog.WarningDepth(2, core.T.String(), sdm.trname(), "is already open")
49+
}
3650

3751
// called on-demand
38-
func (s *SharedDM) Open() {
39-
s.openmu.Lock()
40-
defer s.openmu.Unlock()
41-
if s.dm.stage.opened.Load() {
42-
return
52+
func (sdm *sharedDM) Open() error {
53+
if sdm.IsOpen() {
54+
sdm._already()
55+
return nil
4356
}
44-
s.rxcbs = make(map[string]transport.RecvObj, 4)
45-
s.dm.Open()
46-
nlog.InfoDepth(1, core.T.String(), "open", s.trname())
57+
58+
sdm.ocmu.Lock()
59+
if sdm.IsOpen() {
60+
sdm.ocmu.Unlock()
61+
sdm._already()
62+
return nil
63+
}
64+
65+
sdm.rxmu.Lock()
66+
sdm.rxcbs = make(map[string]transport.RecvObj, 4)
67+
sdm.rxmu.Unlock()
68+
69+
if err := sdm.dm.RegRecv(); err != nil {
70+
sdm.ocmu.Unlock()
71+
nlog.ErrorDepth(1, core.T.String(), err)
72+
debug.AssertNoErr(err)
73+
return err
74+
}
75+
sdm.dm.Open()
76+
sdm.ocmu.Unlock()
77+
78+
nlog.InfoDepth(1, core.T.String(), "open", sdm.trname())
79+
return nil
4780
}
4881

4982
// nothing running + 10m inactivity
50-
func (s *SharedDM) Close() error {
51-
s.openmu.Lock()
52-
defer s.openmu.Unlock()
53-
if !s.dm.stage.opened.Load() {
83+
func (sdm *sharedDM) Close() error {
84+
if !sdm.IsOpen() {
85+
return nil
86+
}
87+
sdm.ocmu.Lock()
88+
if !sdm.IsOpen() {
89+
sdm.ocmu.Unlock()
5490
return nil
5591
}
56-
if len(s.rxcbs) > 0 {
57-
return fmt.Errorf("cannot close %s: %v", s.trname(), s.rxcbs) // TODO -- FIXME: cleanup
92+
93+
var (
94+
xid string
95+
l int
96+
)
97+
sdm.rxmu.Lock()
98+
for xid = range sdm.rxcbs {
99+
break
100+
}
101+
l = len(sdm.rxcbs)
102+
103+
if l > 0 {
104+
sdm.rxmu.Unlock()
105+
debug.Assert(cos.IsValidUUID(xid), xid)
106+
sdm.ocmu.Unlock()
107+
return fmt.Errorf("cannot close %s: [%s, %d]", sdm.trname(), xid, l)
58108
}
59-
s.rxcbs = nil
60-
s.dm.Close(nil)
61-
nlog.InfoDepth(1, core.T.String(), "close", s.trname())
109+
sdm.rxcbs = nil
110+
sdm.rxmu.Unlock()
111+
112+
sdm.dm.Close(nil)
113+
sdm.dm.UnregRecv()
114+
sdm.ocmu.Unlock()
115+
116+
nlog.InfoDepth(1, core.T.String(), "close", sdm.trname())
62117
return nil
63118
}
64119

65-
func (s *SharedDM) RegRecv(xid string, cb transport.RecvObj) {
66-
s.rxmu.Lock()
67-
debug.Assert(s.rxcbs[xid] == nil)
68-
s.rxcbs[xid] = cb
69-
s.rxmu.Unlock()
120+
func (sdm *sharedDM) RegRecv(xid string, cb transport.RecvObj) {
121+
sdm.ocmu.Lock()
122+
sdm.rxmu.Lock()
123+
if !sdm.IsOpen() {
124+
sdm.rxmu.Unlock()
125+
sdm.ocmu.Unlock()
126+
debug.Assert(false, sdm.trname(), " ", "closed")
127+
return
128+
}
129+
debug.Assert(sdm.rxcbs[xid] == nil)
130+
sdm.rxcbs[xid] = cb
131+
sdm.rxmu.Unlock()
132+
sdm.ocmu.Unlock()
70133
}
71134

72-
func (s *SharedDM) UnregRecv(xid string) {
73-
s.rxmu.Lock()
74-
delete(s.rxcbs, xid)
75-
s.rxmu.Unlock()
135+
func (sdm *sharedDM) UnregRecv(xid string) {
136+
sdm.ocmu.Lock()
137+
sdm.rxmu.Lock()
138+
if !sdm.IsOpen() {
139+
sdm.rxmu.Unlock()
140+
sdm.ocmu.Unlock()
141+
debug.Assert(false, sdm.trname(), " ", "closed")
142+
return
143+
}
144+
delete(sdm.rxcbs, xid)
145+
sdm.rxmu.Unlock()
146+
sdm.ocmu.Unlock()
76147
}
77148

78-
// NOTE (and limitation): use hdr.Opaque exclusively for xaction IDs
79-
func (s *SharedDM) recv(hdr *transport.ObjHdr, r io.Reader, err error) error {
149+
func (sdm *sharedDM) recv(hdr *transport.ObjHdr, r io.Reader, err error) error {
80150
if err != nil {
81151
return err
82152
}
83153
xid := string(hdr.Opaque)
84154
if err := xact.CheckValidUUID(xid); err != nil {
85-
return fmt.Errorf("%s: %v", s.trname(), err)
155+
return fmt.Errorf("%s: %v", sdm.trname(), err)
86156
}
87-
s.rxmu.Lock()
88-
cb, ok := s.rxcbs[xid]
89-
s.rxmu.Unlock()
157+
158+
sdm.rxmu.Lock()
159+
if !sdm.IsOpen() {
160+
sdm.rxmu.Unlock()
161+
return fmt.Errorf("%s is closed, dropping recv [xid: %s, oname: %s]", sdm.trname(), xid, hdr.ObjName)
162+
}
163+
cb, ok := sdm.rxcbs[xid]
164+
sdm.rxmu.Unlock()
165+
90166
if !ok {
91-
return fmt.Errorf("%s: no registered handler for xact %q", s.trname(), xid)
167+
return fmt.Errorf("%s: xid %s not found, dropping recv [oname: %s]", sdm.trname(), xid, hdr.ObjName)
92168
}
93169
return cb(hdr, r, nil)
94170
}

0 commit comments

Comments
 (0)