Skip to content

Commit

Permalink
SEARCH-8101 Cancel the query upon timeout
Browse files Browse the repository at this point in the history
  • Loading branch information
dcheckoway committed Dec 10, 2024
1 parent 2df1dba commit 4ef78d0
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 1 deletion.
6 changes: 5 additions & 1 deletion pkg/plugin/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func (d *Datasource) query(_ context.Context, _ backend.PluginContext, dataQuery
elapsed := time.Since(startTime)
// If there's a configured timeout, ensure we don't let the query run longer than that
if maxQueryDuration > 0 && elapsed >= maxQueryDuration {
// TODO: cancel the query
backend.Logger.Debug("query timed out, canceling", "jobId", jobId)
err := d.SearchAPI.CancelQuery(jobId)
if err != nil {
backend.Logger.Warn("failed to cancel query", "jobId", jobId, "err", err)
}
return backend.ErrDataResponse(backend.StatusBadRequest, fmt.Sprintf("Job %s still not finished after %v (status=%v). Consider using a scheduled search to speed this up. https://docs.cribl.io/search/scheduled-searches/", jobId, maxQueryDuration, status))
}
a, b = b, a+b // Fibonacci backoff
Expand Down
33 changes: 33 additions & 0 deletions pkg/plugin/search_api.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package plugin

import (
"bytes"
"encoding/json"
"errors"
"fmt"
Expand Down Expand Up @@ -59,6 +60,12 @@ func (api *SearchAPI) RunQueryAndGetResults(queryParams *url.Values) (*SearchQue
return &result, nil
}

// Cancel a search query.
func (api *SearchAPI) CancelQuery(jobId string) error {
_, err := api.doPOST(fmt.Sprintf("/api/v1/m/default_search/search/jobs/%s/cancel", jobId), nil, "application/json", []byte("{}"))
return err
}

// Load the list of saved search IDs available to the user corresponding to the API creds.
// This can be used to populate a dropdown to make it easy for the user to pick one.
// Returns a list of saved search IDs.
Expand Down Expand Up @@ -99,6 +106,32 @@ func (api *SearchAPI) doGET(uri string, queryParams *url.Values) ([]byte, error)
if err != nil {
return nil, fmt.Errorf("GET request failed: %v", err.Error())
}
return api.readResponse(res)
}

// Perform a GET request to the API, returning the raw response body as a byte array
func (api *SearchAPI) doPOST(uri string, queryParams *url.Values, contentType string, data []byte) ([]byte, error) {
req, err := http.NewRequest("POST", api.url(uri), bytes.NewBuffer(data))
if err != nil {
return nil, fmt.Errorf("failed to create POST request: %v", err.Error())
}
err = api.addAuthorization(req)
if err != nil {
return nil, fmt.Errorf("failed to add Authorization: %v", err.Error())
}
if queryParams != nil {
req.URL.RawQuery = queryParams.Encode()
}
req.Header.Set("Content-Type", contentType)
backend.Logger.Debug("http POST", "URL", req.URL.String(), "contentType", contentType)
res, err := http.DefaultClient.Do(req)
if err != nil {
return nil, fmt.Errorf("POST request failed: %v", err.Error())
}
return api.readResponse(res)
}

func (api *SearchAPI) readResponse(res *http.Response) ([]byte, error) {
defer res.Body.Close()
responseBody, err := io.ReadAll(res.Body)
if err != nil {
Expand Down

0 comments on commit 4ef78d0

Please sign in to comment.