Skip to content
This repository has been archived by the owner on Jun 6, 2024. It is now read-only.

Commit

Permalink
Merge pull request #80 from o19s/REST/exposed-query_id
Browse files Browse the repository at this point in the history
Exposed query id and HTTP TRACE
  • Loading branch information
jzonthemtn authored Mar 4, 2024
2 parents 7e8e211 + 28db76c commit f49fb9a
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 9 deletions.
4 changes: 2 additions & 2 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ version: '3'
services:
opensearch:
build: ./
container_name: opensearch
container_name: opensearch_ubi
environment:
discovery.type: single-node
node.name: opensearch
Expand All @@ -13,7 +13,7 @@ services:
# Warning: this is opening it up to all cross domains
# http.cors.allow-origin: "http://localhost"...
http.cors.allow-origin: "*"
http.cors.allow-methods: OPTIONS,HEAD,GET,POST,PUT,DELETE
http.cors.allow-methods: OPTIONS,TRACE,HEAD,GET,POST,PUT,DELETE
http.cors.allow-credentials: true
http.cors.allow-headers: X-Requested-With,X-Auth-Token,Content-Type,Content-Length,Authorization
plugins.ubi.indices: "awesome"
Expand Down
1 change: 1 addition & 0 deletions documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ The plugin exposes a REST API.
| `DELETE` | `/_plugins/ubi/{store}` | Delete a backend store |
| `GET` | `/_plugins/ubi` | Get a list of all stores |
| `POST` | `/_plugins/ubi/{store}` | Index events into the store |
| `TRACE` | `/_plugins/ubi` | For temporary developer debugging |

### Creating a Store

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ public class UserBehaviorInsightsActionFilter implements ActionFilter {
private final Settings settings;
private final ThreadPool threadPool;

public Settings getSettings(){
return this.settings;
}

public UserBehaviorInsightsActionFilter(final Backend backend, final Settings settings, ThreadPool threadPool) {
this.backend = backend;
this.settings = settings;
Expand Down Expand Up @@ -106,10 +110,13 @@ public void onResponse(Response response) {
LOGGER.error("Unable to persist query.", ex);
}

LOGGER.info("Setting and exposing query_id {}", queryId);
//HACK: this should be set in the OpenSearch config (to send to the client code just once),
// and not on every single search response,
// but that server setting doesn't appear to be exposed.
threadPool.getThreadContext().addResponseHeader("Access-Control-Expose-Headers", "query_id");
threadPool.getThreadContext().addResponseHeader("query_id", queryId);

//}

final long elapsedTime = System.currentTimeMillis() - startTime;
LOGGER.info("UBI search request filter took {} ms", elapsedTime);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,14 @@
import org.opensearch.rest.BaseRestHandler;
import org.opensearch.rest.BytesRestResponse;
import org.opensearch.rest.RestRequest;
import org.opensearch.ubi.HeaderConstants;
import org.opensearch.ubi.backends.Backend;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.UUID;

import static org.opensearch.rest.RestRequest.Method.*;

Expand All @@ -49,13 +52,14 @@ public List<Route> routes() {
new Route(PUT, "/_plugins/ubi/{store}"), // Initializes the store.
new Route(DELETE, "/_plugins/ubi/{store}"), // Deletes a store.
new Route(GET, "/_plugins/ubi"), // Lists all stores
new Route(TRACE, "/_plugins/ubi"), // for debugging rest weirdness
new Route(POST, "/_plugins/ubi/{store}")); // Indexes events into the store.
}

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nodeClient) {

LOGGER.log(Level.INFO, "received event");
LOGGER.log(Level.INFO, "{}: received event", request.method());

if (request.method() == PUT) {

Expand All @@ -66,7 +70,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod
return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "missing store name"));
}

LOGGER.info("Creating UBL store {}", storeName);
LOGGER.info("Creating UBI store {}", storeName);

return (channel) -> {
/*if(backend.exists(storeName)) {
Expand All @@ -86,7 +90,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod
return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.BAD_REQUEST, "missing store name"));
}

LOGGER.info("Deleting UBL store {}", storeName);
LOGGER.info("Deleting UBI store {}", storeName);

return (channel) -> {
backend.delete(storeName);
Expand All @@ -104,7 +108,7 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod
return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.NOT_FOUND, "store not found"));
}*/

LOGGER.info("Queuing event for storage into UBL store {}", storeName);
LOGGER.info("Queuing event for storage into UBI store {}", storeName);
final String eventJson = request.content().utf8ToString();

try {
Expand All @@ -130,7 +134,35 @@ protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient nod

return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, s));

}
} else if (request.method() == TRACE) {
LOGGER.warn("TRACE");

final Map<String, List<String>> headers = request.getHeaders();
LOGGER.info("Exposed headers: " + String.join(",", headers.keySet()));

List<String> ids = headers.get(HeaderConstants.QUERY_ID_HEADER.toString());
String queryId = null;
if(ids == null || ids.size() == 0){
LOGGER.warn("Null REST parameter: {}. Using default id.", HeaderConstants.QUERY_ID_HEADER);
queryId = UUID.randomUUID().toString();
}
else {
queryId = ids.get(0);
}

final Set<String> stores = backend.get();


final String s = "query_id:" + queryId + "&stores:" + String.join(",", stores);

BytesRestResponse response = new BytesRestResponse(RestStatus.OK, "application/x-www-form-urlencoded", s);
response.addHeader("Access-Control-Expose-Headers", "query_id");
response.addHeader("query_id", queryId);

return (channel) -> channel.sendResponse(response);
}
else
LOGGER.warn("Unknown method " + request.method());

// TODO: Return a list names of all search_relevance stores.
return (channel) -> channel.sendResponse(new BytesRestResponse(RestStatus.OK, "ok"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@

public abstract class AbstractEventManager {

@SuppressWarnings("unused")
private final Logger LOGGER = LogManager.getLogger(AbstractEventManager.class);

protected final EventQueue eventQueue;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"DELETE",
"GET",
"POST",
"TRACE",
"PUT"
]
}
Expand Down

0 comments on commit f49fb9a

Please sign in to comment.