Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/prometheus/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,9 @@ func main() {
a.Flag("storage.remote.flush-deadline", "How long to wait flushing sample on shutdown or config reload.").
Default("1m").PlaceHolder("<duration>").SetValue(&cfg.RemoteFlushDeadline)

a.Flag("storage.remote.read-sample-limit", "Maximum overall number of samples to return via the remote read interface, in a single query. 0 means no limit.").
Default("5e7").IntVar(&cfg.web.RemoteReadLimit)

a.Flag("rules.alert.for-outage-tolerance", "Max time to tolerate prometheus outage for restoring 'for' state of alert.").
Default("1h").SetValue(&cfg.outageTolerance)

Expand Down
23 changes: 22 additions & 1 deletion storage/remote/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,19 @@ import (
// decodeReadLimit is the maximum size of a read request body in bytes.
const decodeReadLimit = 32 * 1024 * 1024

type HTTPError struct {
msg string
status int
}

func (e HTTPError) Error() string {
return e.msg
}

func (e HTTPError) Status() int {
return e.status
}

// DecodeReadRequest reads a remote.Request from a http.Request.
func DecodeReadRequest(r *http.Request) (*prompb.ReadRequest, error) {
compressed, err := ioutil.ReadAll(io.LimitReader(r.Body, decodeReadLimit))
Expand Down Expand Up @@ -134,14 +147,22 @@ func FromQuery(req *prompb.Query) (int64, int64, []*labels.Matcher, *storage.Sel
}

// ToQueryResult builds a QueryResult proto.
func ToQueryResult(ss storage.SeriesSet) (*prompb.QueryResult, error) {
func ToQueryResult(ss storage.SeriesSet, sampleLimit int) (*prompb.QueryResult, error) {
numSamples := 0
resp := &prompb.QueryResult{}
for ss.Next() {
series := ss.At()
iter := series.Iterator()
samples := []*prompb.Sample{}

for iter.Next() {
numSamples++
if sampleLimit > 0 && numSamples > sampleLimit {
return nil, HTTPError{
msg: fmt.Sprintf("exceeded sample limit (%d)", sampleLimit),
status: http.StatusBadRequest,
}
}
ts, val := iter.At()
samples = append(samples, &prompb.Sample{
Timestamp: ts,
Expand Down
2 changes: 1 addition & 1 deletion storage/remote/read_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func TestSeriesSetFilter(t *testing.T) {

for i, tc := range tests {
filtered := newSeriesSetFilter(FromQueryResult(tc.in), tc.toRemove)
have, err := ToQueryResult(filtered)
have, err := ToQueryResult(filtered, 1e6)
if err != nil {
t.Fatal(err)
}
Expand Down
30 changes: 19 additions & 11 deletions web/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,9 +131,10 @@ type API struct {
flagsMap map[string]string
ready func(http.HandlerFunc) http.HandlerFunc

db func() *tsdb.DB
enableAdmin bool
logger log.Logger
db func() *tsdb.DB
enableAdmin bool
logger log.Logger
remoteReadLimit int
}

// NewAPI returns an initialized API type.
Expand All @@ -149,19 +150,22 @@ func NewAPI(
enableAdmin bool,
logger log.Logger,
rr rulesRetriever,
remoteReadLimit int,
) *API {
return &API{
QueryEngine: qe,
Queryable: q,
targetRetriever: tr,
alertmanagerRetriever: ar,
now: time.Now,
config: configFunc,
flagsMap: flagsMap,
ready: readyFunc,
db: db,
enableAdmin: enableAdmin,
rulesRetriever: rr,

now: time.Now,
config: configFunc,
flagsMap: flagsMap,
ready: readyFunc,
db: db,
enableAdmin: enableAdmin,
rulesRetriever: rr,
remoteReadLimit: remoteReadLimit,
}
}

Expand Down Expand Up @@ -793,8 +797,12 @@ func (api *API) remoteRead(w http.ResponseWriter, r *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp.Results[i], err = remote.ToQueryResult(set)
resp.Results[i], err = remote.ToQueryResult(set, api.remoteReadLimit)
if err != nil {
if httpErr, ok := err.(remote.HTTPError); ok {
http.Error(w, httpErr.Error(), httpErr.Status())
return
}
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Expand Down
7 changes: 6 additions & 1 deletion web/api/v1/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func setupRemote(s storage.Storage) *httptest.Server {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
resp.Results[i], err = remote.ToQueryResult(set)
resp.Results[i], err = remote.ToQueryResult(set, 1e6)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
Expand Down Expand Up @@ -833,6 +833,7 @@ func TestReadEndpoint(t *testing.T) {
},
}
},
remoteReadLimit: 1e6,
}

// Encode the request.
Expand Down Expand Up @@ -861,6 +862,10 @@ func TestReadEndpoint(t *testing.T) {
recorder := httptest.NewRecorder()
api.remoteRead(recorder, request)

if recorder.Code/100 != 2 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a test for the too large case as well?

t.Fatal(recorder.Code)
}

// Decode the response.
compressed, err = ioutil.ReadAll(recorder.Result().Body)
if err != nil {
Expand Down
2 changes: 2 additions & 0 deletions web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ type Options struct {
ConsoleLibrariesPath string
EnableLifecycle bool
EnableAdminAPI bool
RemoteReadLimit int
}

func instrumentHandler(handlerName string, handler http.HandlerFunc) http.HandlerFunc {
Expand Down Expand Up @@ -227,6 +228,7 @@ func New(logger log.Logger, o *Options) *Handler {
h.options.EnableAdminAPI,
logger,
h.ruleManager,
h.options.RemoteReadLimit,
)

if o.RoutePrefix != "/" {
Expand Down