-
Notifications
You must be signed in to change notification settings - Fork 93
Eagerly fetch data in hot-storage #807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
Signed-off-by: Ignacio Hagopian <jsign.uy@gmail.com>
FFSDealFinalityTimeout: time.Minute * 30, | ||
FFSMaxParallelDealPreparing: 1, | ||
FFSGCAutomaticGCInterval: 0, | ||
FFSRetrievalNextEventTimeout: time.Hour, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A new config attribute to set a timeout for retrieval that might get stuck.
If we don't receive any data or event while doing the retrieval in this duration, we fail.
Unfortunately, there're situations/bugs in Lotus in which a retrieval might get stuck, so this is a safety net.
@@ -387,6 +389,7 @@ func setupFlags() error { | |||
pflag.String("ffsminerselector", "reputation", "Miner selector to be used by FFS: 'sr2', 'reputation'.") | |||
pflag.String("ffsminerselectorparams", "", "Miner selector configuration parameter, depends on --ffsminerselector.") | |||
pflag.String("ffsminimumpiecesize", "67108864", "Minimum piece size in bytes allowed to be stored in Filecoin.") | |||
pflag.Duration("ffsretrievalnexteventtimeout", time.Hour, "Maximum amount of time to wait for the next retrieval event before erroring it.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be great in the future to change all duration flags to .Duration
.
Created #803
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I've been using that lately, quite nice.
log.Infof("in progress retrieval errored: %s", err) | ||
log.Infof("in progress retrieval errored: %s", e.Err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wrong error variable.
@@ -72,11 +72,15 @@ func (ci *CoreIpfs) Stage(ctx context.Context, iid ffs.APIID, r io.Reader) (cid. | |||
return p.Cid(), nil | |||
} | |||
|
|||
// StageCid stage-pin a Cid. | |||
// StageCid pull the Cid data and stage-pin it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're changing the meaning of StageCid
to do something similar to what Stage
does.
Before it only tracked the Cid as "staged-pined" (that temporal pin that we created to allow multitenancy).
Now it also pins the data in the go-ipfs node, instead of assuming that's true.
for { | ||
select { | ||
case <-time.After(fc.retrNextEventTimeout): | ||
return ffs.FetchInfo{}, fmt.Errorf("didn't receive events for %d minutes", int64(fc.retrNextEventTimeout.Minutes())) | ||
case e, ok := <-events: | ||
if !ok { | ||
break Loop | ||
} | ||
if e.Err != "" { | ||
return ffs.FetchInfo{}, fmt.Errorf("event error in retrieval progress: %s", e.Err) | ||
} | ||
strEvent := retrievalmarket.ClientEvents[e.Event] | ||
strDealStatus := retrievalmarket.DealStatuses[e.Status] | ||
fundsSpent = e.FundsSpent.Uint64() | ||
newMsg := fmt.Sprintf("Received %s, total spent: %sFIL (%s/%s)", humanize.IBytes(e.BytesReceived), util.AttoFilToFil(fundsSpent), strEvent, strDealStatus) | ||
if newMsg != lastMsg { | ||
fc.l.Log(ctx, newMsg) | ||
lastMsg = newMsg | ||
} | ||
lastEvent = e | ||
} | ||
} | ||
if lastEvent.Status != retrievalmarket.DealStatusCompleted { | ||
return ffs.FetchInfo{}, fmt.Errorf("retrieval failed with status %s and message %s", retrievalmarket.DealStatuses[lastEvent.Status], lastMsg) | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TL;DR:
- Use the timeout to fail if the retrieval got stuck
- If the
events
channel is closed, we check that the last received event is exactly the one that means the retrieval ended successfully. If for some reason the channel gets closed in some non-final status, that's wrong and we should error.
AddTimeout: 480, // 8 min | ||
AddTimeout: 15 * 60, // 15min |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe this is a better default for big-data. The user can change it anyway.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth updating everywhere in the db using the massive change tool?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yep, I have that as a task in my pre-deployment steps. Using powcfg
to do it :)
// We want to avoid relying on Lotus working in online-mode. | ||
// We need to take care ourselves of pulling the data from | ||
// the IPFS network. | ||
if !a.Cfg.Hot.Enabled && a.Cfg.Cold.Enabled { | ||
s.l.Log(ctx, "Automatically staging Cid from the IPFS network...") | ||
stageCtx, cancel := context.WithTimeout(ctx, time.Duration(a.Cfg.Hot.Ipfs.AddTimeout)*time.Second) | ||
defer cancel() | ||
if err := s.hs.StageCid(stageCtx, a.APIID, a.Cid); err != nil { | ||
return ffs.StorageInfo{}, nil, fmt.Errorf("automatically staging cid: %s", err) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK, so the gist of all this.
The main problem was that we were using IpfsOnlineMode=true
in the Lotus node for the Hub case, but that has a bug for retrievals. So what I do here is simply forcing staging the Cid in the hot-storage in Powergate, and allowing Lotus to use the go-ipfs HTTP in offline mode since the data will be there thanks to Powergate.
TBH, the IpfsOnlineMode=true
was much more elegant than this pre-fetching; but this was mostly about doing a workaround for the bug, not really fixing something wrong.
Miner: offer.Miner.String(), | ||
Miner: offer.MinerPeer.Address.String(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Solve a situation in which miners claim that the retrieval will be done from a worker address, and not their owner address. This might confuse the user since the deal was made with miner X, and it might see that the retrieval is being made from miner Y. (And miner Y hasn't stored data in the network and it's mostly empty).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good!
@@ -387,6 +389,7 @@ func setupFlags() error { | |||
pflag.String("ffsminerselector", "reputation", "Miner selector to be used by FFS: 'sr2', 'reputation'.") | |||
pflag.String("ffsminerselectorparams", "", "Miner selector configuration parameter, depends on --ffsminerselector.") | |||
pflag.String("ffsminimumpiecesize", "67108864", "Minimum piece size in bytes allowed to be stored in Filecoin.") | |||
pflag.Duration("ffsretrievalnexteventtimeout", time.Hour, "Maximum amount of time to wait for the next retrieval event before erroring it.") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yea, I've been using that lately, quite nice.
AddTimeout: 480, // 8 min | ||
AddTimeout: 15 * 60, // 15min |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Worth updating everywhere in the db using the massive change tool?
This PR:
IpfsOnlineMode=true
StageCid
method also pulls the data now from the IPFS network.The main motivation for this change is: textileio/textile#533