Skip to content

Commit

Permalink
Merge branch 'aecio/issue-74' into master (issue #74)
Browse files Browse the repository at this point in the history
  • Loading branch information
aecio committed Apr 26, 2017
2 parents 1f73525 + b6c5ba8 commit 739e423
Show file tree
Hide file tree
Showing 7 changed files with 302 additions and 14 deletions.
1 change: 1 addition & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ dependencies {
compile 'nz.ac.waikato.cms.weka:weka-stable:3.6.13'
compile 'org.apache.lucene:lucene-core:4.10.4'
compile 'org.elasticsearch:elasticsearch:1.4.4'
compile 'org.elasticsearch.client:rest:5.3.0'
compile 'io.dropwizard.metrics:metrics-core:3.1.3'
compile 'io.dropwizard.metrics:metrics-json:3.1.3'
compile 'io.dropwizard.metrics:metrics-jvm:3.1.3'
Expand Down
46 changes: 38 additions & 8 deletions docs/data-formats.rst
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
Data Formats
############

.. highlight :: yaml
ACHE can store data in different data formats. The data format can be configured by changing the key ``target_storage.data_format.type`` in the `configuration file <https://github.com/ViDA-NYU/ache/blob/master/config/sample_config/ache.yml>`_.

The data formats currently available are:
Expand Down Expand Up @@ -70,7 +72,7 @@ The ELASTICSEARCH data format stores raw content and metadata as documents in an
Types and fields
************************

Currently, ACHE indexes documents into two ElasticSearch types:
Currently, ACHE indexes documents into two Elasticsearch types:

* ``target``, for pages classified as on-topic by the page classifier
* ``negative``, for pages classified as off-topic by the page classifier
Expand All @@ -91,23 +93,51 @@ These two types use the same schema, which has the following fields:
Configuration
*************

To use ElasticSearch, you need to enable the following lines in the configuration file ``ache.yml``::
To use Elasticsearch data format, you need to add the following line to the
configuration file ``ache.yml``::

target_storage.data_format.type: ELASTICSEARCH
target_storage.data_format.elasticsearch.host: localhost
target_storage.data_format.elasticsearch.port: 9300
target_storage.data_format.elasticsearch.cluster_name: elasticsearch

You will also need to specify the host address and port where Elasticsearch is running.
See the following subsections for more details.

**REST Client (ACHE version >0.8)**

Starting in version 0.8, ACHE uses the official
`Java REST client <https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/index.html>`_
to connect to Elasticsearch. You can specify one or more Elasticsearch node
addresses which the REST client should connect to using the following lines:

.. code:: yaml
target_storage.data_format.elasticsearch.rest.hosts:
- http://node1:9200
- http://node2:9200
.. warning ::
The following additional parameters can also be configured. Refer to
the Elasticsearch `REST Client documentation <https://www.elastic.co/guide/en/elasticsearch/client/java-rest/current/_timeouts.html>`_
for more information on these parameters.

``target_storage.data_format.elasticsearch.port`` should point to the transport client port (which defaults to 9300), not the JSON API port.
.. code:: yaml
target_storage.data_format.elasticsearch.rest.connect_timeout: 30000
target_storage.data_format.elasticsearch.rest.socket_timeout: 30000
target_storage.data_format.elasticsearch.rest.max_retry_timeout_millis: 90000
**Transport Client (deprecated)**

You can also configure ACHE to connect to Elasticsearch v1.x using the native
transport client by adding the following lines::

target_storage.data_format.elasticsearch.host: localhost
target_storage.data_format.elasticsearch.port: 9300
target_storage.data_format.elasticsearch.cluster_name: elasticsearch


Command line parameters
****************************************

When running ACHE using ElasticSearch, you should provide the name of the ElasticSearch index that should be used in the command line using the following arguments::
When running ACHE using Elasticsearch, you should provide the name of the Elasticsearch index that should be used in the command line using the following arguments::

-e <arg>

Expand Down
2 changes: 1 addition & 1 deletion docs/page-classifiers.rst
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ Page classifiers are flexible and can be as simple as a simple regular expressio
or a sophisticated machine-learning based classification model.

Configuring Page Classifiers
===========================
============================

To configure a page classifier, you will need to create a new directory
containing a file named ``pageclassifier.yml`` specifying the type of
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
import java.util.List;
import java.util.TreeSet;

import org.apache.http.annotation.NotThreadSafe;
//import org.apache.http.annotation.NotThreadSafe;
import org.apache.http.client.CookieStore;
import org.apache.http.cookie.Cookie;
import org.apache.http.cookie.CookieIdentityComparator;
Expand All @@ -33,7 +33,7 @@
* HttpComponents Changes: removed synchronization
*
*/
@NotThreadSafe
//@NotThreadSafe
public class LocalCookieStore implements CookieStore, Serializable {

private static final long serialVersionUID = -7581093305228232025L;
Expand Down
12 changes: 9 additions & 3 deletions src/main/java/focusedCrawler/target/TargetStorage.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import focusedCrawler.target.classifier.TargetClassifierException;
import focusedCrawler.target.classifier.TargetClassifierFactory;
import focusedCrawler.target.model.Page;
import focusedCrawler.target.repository.ElasticSearchRestTargetRepository;
import focusedCrawler.target.repository.ElasticSearchTargetRepository;
import focusedCrawler.target.repository.FileSystemTargetRepository;
import focusedCrawler.target.repository.FileSystemTargetRepository.DataFormat;
Expand Down Expand Up @@ -186,9 +187,14 @@ else if(dataFormat.equals("ELASTICSEARCH")) {
if(indexName == null) {
throw new IllegalArgumentException("ElasticSearch index name not provided!");
}
ElasticSearchConfig esconfig = config.getElasticSearchConfig();
targetRepository = new ElasticSearchTargetRepository(esconfig, indexName, "target");
negativeRepository = new ElasticSearchTargetRepository(esconfig, indexName, "negative");
ElasticSearchConfig esconfig = config.getElasticSearchConfig();
if (esconfig.getRestApiHosts() == null) {
targetRepository = new ElasticSearchTargetRepository(esconfig, indexName, "target");
negativeRepository = new ElasticSearchTargetRepository(esconfig, indexName, "negative");
} else {
targetRepository = new ElasticSearchRestTargetRepository(esconfig, indexName, "target");
negativeRepository = new ElasticSearchRestTargetRepository(esconfig, indexName, "negative");
}
}
else {
throw new IllegalArgumentException("Invalid data format provided: "+dataFormat);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package focusedCrawler.target.repository;

import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLEncoder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;

import org.apache.http.HttpHost;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.entity.AbstractHttpEntity;
import org.apache.http.entity.ContentType;
import org.apache.http.nio.entity.NStringEntity;
import org.apache.http.util.EntityUtils;
import org.elasticsearch.client.Response;
import org.elasticsearch.client.RestClient;
import org.elasticsearch.client.RestClientBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;

import focusedCrawler.target.model.Page;
import focusedCrawler.target.model.TargetModelElasticSearch;
import focusedCrawler.target.repository.elasticsearch.ElasticSearchConfig;

public class ElasticSearchRestTargetRepository implements TargetRepository {

private static final Map<String, String> EMPTY_MAP = Collections.<String, String>emptyMap();
private static final Logger logger = LoggerFactory.getLogger(ElasticSearchRestTargetRepository.class);
private static final ObjectMapper mapper = new ObjectMapper();

static {
mapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS);
}

private RestClient client;
private String typeName;
private String indexName;

public ElasticSearchRestTargetRepository(ElasticSearchConfig config,
String indexName,
String typeName) {
this.indexName = indexName;
this.typeName = typeName;
this.client = createRestClient(config);
this.createIndexMapping(indexName);
}

private void createIndexMapping(String indexName) {

String indexEndpoint = "/" + indexName;
boolean exists = false;
String esVersion = "5.x.x";
try {
Response existsResponse = client.performRequest("HEAD", indexEndpoint);
exists = (existsResponse.getStatusLine().getStatusCode() == 200);

Response rootResponse = client.performRequest("GET", "/");
String json = EntityUtils.toString(rootResponse.getEntity());
String versionNumber = mapper.readTree(json).path("version").path("number").asText();
if (versionNumber != null && !versionNumber.isEmpty()) {
esVersion = versionNumber;
}
logger.info("Elasticsearch version: {}", esVersion);
} catch (IOException e) {
throw new RuntimeException(
"Failed to check whether index already exists in Elasticsearch.", e);
}

if (!exists) {
final String targetMapping1x = ""
+ "{"
+ " \"properties\": {"
+ " \"domain\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"words\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"wordsMeta\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"retrieved\": {\"type\": \"date\",\"format\": \"dateOptionalTime\"},"
+ " \"text\": {\"type\": \"string\"},"
+ " \"title\": {\"type\": \"string\"},"
+ " \"url\": {\"type\": \"string\",\"index\": \"not_analyzed\"},"
+ " \"topPrivateDomain\": {\"type\": \"string\",\"index\": \"not_analyzed\"}"
+ " }"
+ "}";

final String pageMapping5x =""
+ "{"
+ " \"properties\": {"
+ " \"domain\": {\"type\": \"keyword\",\"index\": true},"
+ " \"words\": {\"type\": \"keyword\",\"index\": true},"
+ " \"wordsMeta\": {\"type\": \"keyword\",\"index\": true},"
+ " \"retrieved\": {\"type\": \"date\",\"format\": \"dateOptionalTime\"},"
+ " \"text\": {\"type\": \"text\"},"
+ " \"title\": {\"type\": \"text\"},"
+ " \"url\": {\"type\": \"keyword\",\"index\":true},"
+ " \"topPrivateDomain\": {\"type\": \"keyword\",\"index\": true}"
+ " }"
+ "}";

String pageProperties = esVersion.startsWith("5.") ? pageMapping5x : targetMapping1x;

String mapping =
"{"
+ " \"mappings\": {"
+ " \"target\": "+ pageProperties + ","
+ " \"negative\": "+ pageProperties
+ " }"
+ "}";

try {
AbstractHttpEntity entity = createJsonEntity(mapping);
Response response = client.performRequest("PUT", indexEndpoint, EMPTY_MAP, entity);
if (response.getStatusLine().getStatusCode() != 200) {
throw new RuntimeException(
"Failed to create index in Elasticsearch." + response.toString());
}
} catch (IOException e) {
throw new RuntimeException("Failed to create index in Elasticsearch.", e);
}
}
}

private AbstractHttpEntity createJsonEntity(String mapping) {
return new NStringEntity(mapping, ContentType.APPLICATION_JSON);
}

public boolean insert(Page target) {
return index(target);
}

private boolean index(Page page) {

TargetModelElasticSearch data = new TargetModelElasticSearch(page);

String docId = encodeUrl(page.getURL().toString());
String endpoint = "/" + indexName + "/" + typeName + "/" + docId;
AbstractHttpEntity entity = createJsonEntity(serializeAsJson(data));
try {
Response response = client.performRequest("PUT", endpoint, EMPTY_MAP, entity);
return response.getStatusLine().getStatusCode() == 201;
} catch (IOException e) {
throw new RuntimeException("Failed to index page.", e);
}
}

private String encodeUrl(String url) {
try {
return URLEncoder.encode(url, "UTF-8");
} catch (UnsupportedEncodingException e) {
throw new IllegalStateException("Failed to URL encode string: "+url, e);
}
}

private String serializeAsJson(Object model) {
String targetAsJson;
try {
targetAsJson = mapper.writeValueAsString(model);
} catch (JsonProcessingException e) {
throw new RuntimeException("Failed to serialize TargetModel to JSON.", e);
}
return targetAsJson;
}

public RestClient createRestClient(ElasticSearchConfig config) {

List<String> esHosts = config.getRestApiHosts();
List<HttpHost> hosts = new ArrayList<>();
for (String host : esHosts) {
try {
URL url = new URL(host);
hosts.add(new HttpHost(url.getHost(), url.getPort()));
} catch (MalformedURLException e) {
throw new RuntimeException("Failed to initialize Elasticsearch REST client. "
+ "Invalid host: " + host, e);
}
}

HttpHost[] httpHostsArray = (HttpHost[]) hosts.toArray(new HttpHost[hosts.size()]);

client = RestClient.builder(httpHostsArray)
.setRequestConfigCallback(new RestClientBuilder.RequestConfigCallback() {
@Override
public RequestConfig.Builder customizeRequestConfig(RequestConfig.Builder requestConfigBuilder) {
return requestConfigBuilder
.setConnectTimeout(config.getRestConnectTimeout())
.setSocketTimeout(config.getRestSocketTimeout());
}
})
.setMaxRetryTimeoutMillis(config.getRestMaxRetryTimeoutMillis())
.build();

logger.info("Initialized Elasticsearch REST client for hosts: "+Arrays.toString(httpHostsArray));
return client;
}

@Override
public void close() {
try {
if (client != null) {
client.close();
}
} catch (IOException e) {
throw new RuntimeException("Failed to close Elasticsearch REST client", e);
}
}

}
Loading

0 comments on commit 739e423

Please sign in to comment.