Skip to content

Commit 0affbd7

Browse files
committed
get-batch (multi-node flow passing; major update)
- designated target (DT) xaction management responsibility - with xreg (use-previous) exception - end-to-end control flow (proxy → DT → senders → DT redirect) - shared-dm intra-cluster transport with per x-moss demuxing - hole-plugging recv logic to preserve strict input order - streaming mode via `archive.Writer` with on-the-fly emission - buffered multipart mode with full `apc.MossResp` metadata ------------- [ short tests ] ------------------------------ - passing except for: multi-node + compressed TAR ------------- [ separately ] ------------------------------ - 2PC constants: ActBegin → Begin2PC, etc. - `cos.GenYAID` micro-optimization Signed-off-by: Alex Aizman <alex.aizman@gmail.com>
1 parent d329343 commit 0affbd7

File tree

22 files changed

+647
-381
lines changed

22 files changed

+647
-381
lines changed

ais/ml.go

Lines changed: 211 additions & 106 deletions
Large diffs are not rendered by default.

ais/prxbsumm.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ func (p *proxy) bsummNew(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (err error)
5656
args := allocBcArgs()
5757
args.req = cmn.HreqArgs{
5858
Method: http.MethodGet,
59-
Path: apc.URLPathBuckets.Join(qbck.Name, apc.ActBegin), // compare w/ txn
59+
Path: apc.URLPathBuckets.Join(qbck.Name, apc.Begin2PC), // compare w/ txn
6060
Query: q,
6161
Body: cos.MustMarshal(actMsgExt),
6262
}
@@ -91,7 +91,7 @@ func (p *proxy) bsummCollect(qbck *cmn.QueryBcks, msg *apc.BsummCtrlMsg) (_ cmn.
9191
)
9292
args.req = cmn.HreqArgs{
9393
Method: http.MethodGet,
94-
Path: apc.URLPathBuckets.Join(qbck.Name, apc.ActQuery),
94+
Path: apc.URLPathBuckets.Join(qbck.Name, apc.Query2PC),
9595
Body: cos.MustMarshal(actMsgExt),
9696
}
9797
args.smap = p.owner.smap.get()

ais/prxclu.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1820,7 +1820,7 @@ func (p *proxy) actBackend(w http.ResponseWriter, r *http.Request, tag string, u
18201820
return
18211821
}
18221822
// (two-phase commit)
1823-
for _, phase := range []string{apc.ActBegin, apc.ActCommit} {
1823+
for _, phase := range []string{apc.Begin2PC, apc.Commit2PC} {
18241824
var (
18251825
path string
18261826
args = allocBcArgs()

ais/prxtxn.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func (c *txnCln) init(msg *apc.ActMsg, bck *meta.Bck, config *cmn.Config, waitms
7272
}
7373

7474
func (c *txnCln) begin(what fmt.Stringer) (err error) {
75-
results := c.bcast(apc.ActBegin, c.timeout.netw)
75+
results := c.bcast(apc.Begin2PC, c.timeout.netw)
7676
for _, res := range results {
7777
if res.err != nil {
7878
err = res.toErr()
@@ -88,7 +88,7 @@ func (c *txnCln) begin(what fmt.Stringer) (err error) {
8888
// that can be run concurrently - comma-separated list of UUIDs
8989
func (c *txnCln) commit(what fmt.Stringer, timeout time.Duration) (xid string, all []string, err error) {
9090
same4all := true
91-
results := c.bcast(apc.ActCommit, timeout)
91+
results := c.bcast(apc.Commit2PC, timeout)
9292

9393
for _, res := range results {
9494
if res.err != nil {
@@ -134,7 +134,7 @@ func (c *txnCln) cmtTout(waitmsync bool) time.Duration {
134134

135135
func (c *txnCln) bcast(phase string, timeout time.Duration) (results sliceResults) {
136136
c.req.Path = cos.JoinWords(c.path, phase)
137-
if phase != apc.ActAbort {
137+
if phase != apc.Abort2PC {
138138
now := time.Now()
139139
c.req.Query.Set(apc.QparamUnixTime, cos.UnixNano2S(now.UnixNano()))
140140
}
@@ -157,7 +157,7 @@ func (c *txnCln) bcast(phase string, timeout time.Duration) (results sliceResult
157157

158158
func (c *txnCln) bcastAbort(what fmt.Stringer, err error) {
159159
nlog.Errorf("Abort %q %s: %v %s", c.msg.Action, what, err, c.msg)
160-
results := c.bcast(apc.ActAbort, 0)
160+
results := c.bcast(apc.Abort2PC, 0)
161161
freeBcastRes(results)
162162
}
163163

@@ -986,7 +986,7 @@ func prmBegin(c *txnCln, bck *meta.Bck, singleT bool) (num int64, allAgree bool,
986986
var cksumVal, totalN string
987987
allAgree = !singleT
988988

989-
results := c.bcast(apc.ActBegin, c.timeout.netw)
989+
results := c.bcast(apc.Begin2PC, c.timeout.netw)
990990
for i, res := range results {
991991
if res.err != nil {
992992
err = res.toErr()

ais/test/moss_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -74,8 +74,6 @@ func (c *mossConfig) name() (s string) {
7474
}
7575

7676
func TestMoss(t *testing.T) {
77-
tools.CheckSkip(t, &tools.SkipTestArgs{MaxTargets: 1}) // TODO -- FIXME: remove
78-
7977
var (
8078
numPlainObjs = 500 // plain objects to create
8179
numArchives = 400 // num shards to create
@@ -140,6 +138,14 @@ func TestMoss(t *testing.T) {
140138

141139
for _, test := range tests {
142140
t.Run(test.name(), func(t *testing.T) {
141+
// TODO -- FIXME: remove when fixed multi-node
142+
if test.outputFormat != "" && test.outputFormat != archive.ExtTar && test.outputFormat != archive.ExtZip {
143+
smap := tools.GetClusterMap(t, tools.GetPrimaryURL())
144+
if nt := smap.CountTargets(); nt > 1 {
145+
t.Skipf("skipping %s: still need to fix multi-node (t=%d) for output format %q", test.name(), nt, test.outputFormat)
146+
}
147+
}
148+
143149
var (
144150
bck = cmn.Bck{Name: trand.String(15), Provider: apc.AIS}
145151
m = ioContext{
@@ -263,7 +269,6 @@ func testMossArchives(t *testing.T, m *ioContext, test *mossConfig, numArchives,
263269
}
264270
}
265271
}
266-
267272
if test.streaming {
268273
testMossStreaming(t, m, test, mossIn)
269274
} else {
@@ -480,10 +485,7 @@ func isMissingFile(mossIn *apc.MossIn) bool {
480485
return true
481486
}
482487
// Missing archive paths have ".nonexistent" suffix
483-
if mossIn.ArchPath != "" && strings.HasSuffix(mossIn.ArchPath, mossMissingSuffix) {
484-
return true
485-
}
486-
return false
488+
return mossIn.ArchPath != "" && strings.HasSuffix(mossIn.ArchPath, mossMissingSuffix)
487489
}
488490

489491
func testMossStreaming(t *testing.T, m *ioContext, test *mossConfig, mossIn []apc.MossIn) {
@@ -692,6 +694,7 @@ func validateTarMultipartWithArchive(t *testing.T, req *apc.MossReq, resp apc.Mo
692694
mossOut = &resp.Out[i]
693695
tarEntry = &callback.entries[i]
694696
)
697+
// tlog.Logf("DEBUG: mossOut.Bucket = %q, ObjName = %q\n", mossOut.Bucket, mossOut.ObjName)
695698

696699
// Calculate expected name based on onlyObjName setting
697700
if req.OnlyObjName {

ais/tgtbck.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ func (t *target) httpbckget(w http.ResponseWriter, r *http.Request, dpq *dpq) {
125125
} else {
126126
bucket, phase = apiItems[0], apiItems[1]
127127
}
128-
if phase != apc.ActBegin && phase != apc.ActQuery {
128+
if phase != apc.Begin2PC && phase != apc.Query2PC {
129129
t.writeErrURL(w, r)
130130
return
131131
}
@@ -299,7 +299,7 @@ func (t *target) listObjects(w http.ResponseWriter, r *http.Request, bck *meta.B
299299
}
300300

301301
func (t *target) bsumm(w http.ResponseWriter, r *http.Request, phase string, bck *meta.Bck, msg *apc.BsummCtrlMsg, dpq *dpq) {
302-
if phase == apc.ActBegin {
302+
if phase == apc.Begin2PC {
303303
rns := xreg.RenewBckSummary(bck, msg)
304304
if rns.Err != nil {
305305
t.writeErr(w, r, rns.Err, http.StatusInternalServerError, Silent)
@@ -309,7 +309,7 @@ func (t *target) bsumm(w http.ResponseWriter, r *http.Request, phase string, bck
309309
return
310310
}
311311

312-
debug.Assert(phase == apc.ActQuery, phase)
312+
debug.Assert(phase == apc.Query2PC, phase)
313313
xctn, err := xreg.GetXact(msg.UUID) // vs. hk.OldAgeX removal
314314
if err != nil {
315315
t.writeErr(w, r, err, http.StatusInternalServerError)

ais/tgtcp.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -285,7 +285,7 @@ func (t *target) daeputItems(w http.ResponseWriter, r *http.Request, apiItems []
285285
t.writeErrf(w, r, "expecting cloud storage provider (have %q)", provider)
286286
return
287287
}
288-
if phase != apc.ActBegin && phase != apc.ActCommit {
288+
if phase != apc.Begin2PC && phase != apc.Commit2PC {
289289
t.writeErrf(w, r, "expecting 'begin' or 'commit' phase (have %q)", phase)
290290
return
291291
}
@@ -314,7 +314,7 @@ func (t *target) enableBackend(w http.ResponseWriter, r *http.Request, provider,
314314
switch {
315315
case bp != nil:
316316
t.writeErrf(w, r, "backend %q is already enabled, nothing to do", provider)
317-
case phase == apc.ActBegin:
317+
case phase == apc.Begin2PC:
318318
nlog.Infof("ready to enable backend %q", provider)
319319
default:
320320
var err error
@@ -357,7 +357,7 @@ func (t *target) disableBackend(w http.ResponseWriter, r *http.Request, provider
357357
switch {
358358
case bp == nil:
359359
t.writeErrf(w, r, "backend %q is already disabled, nothing to do", provider)
360-
case phase == apc.ActBegin:
360+
case phase == apc.Begin2PC:
361361
nlog.Infof("ready to disable backend %q", provider)
362362
default:
363363
// NOTE: not locking bp := t.Backend()

0 commit comments

Comments
 (0)