From e360c4667b2b643eaa27d7ee2a70d69c104022d4 Mon Sep 17 00:00:00 2001 From: Quitterie Lucas Date: Thu, 18 Aug 2022 15:20:22 +0200 Subject: [PATCH] =?UTF-8?q?=E2=9C=A8(backends)=20add=20graylog=20service?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit WIP. --- .env.dist | 10 + docker-compose.yml | 19 ++ docs/backends.md | 16 ++ src/ralph/backends/logging/__init__.py | 0 src/ralph/backends/logging/base.py | 17 ++ src/ralph/backends/logging/graylog.py | 210 ++++++++++++++++++++ tests/backends/logging/__init__.py | 0 tests/backends/logging/test_base.py | 22 +++ tests/backends/logging/test_graylog.py | 264 +++++++++++++++++++++++++ 9 files changed, 558 insertions(+) create mode 100644 src/ralph/backends/logging/__init__.py create mode 100644 src/ralph/backends/logging/base.py create mode 100644 src/ralph/backends/logging/graylog.py create mode 100644 tests/backends/logging/__init__.py create mode 100644 tests/backends/logging/test_base.py create mode 100644 tests/backends/logging/test_graylog.py diff --git a/.env.dist b/.env.dist index d0611bef7..8c04249dd 100644 --- a/.env.dist +++ b/.env.dist @@ -4,6 +4,16 @@ RALPH_APP_DIR=/app/.ralph # Uncomment lines (by removing # characters at the beginning of target lines) # to define environment variables associated to the backend(s) you need. +# Graylog storage backend + +# RALPH_GRAYLOG_HOST=graylog +# RALPH_GRAYLOG_PORT=12201 +# RALPH_GRAYLOG_ADMIN_USERNAME=admin +# GRAYLOG_ADMIN_PASSWORD=pass +# GRAYLOG_INPUT_TITLE=TCPInput +# GRAYLOG_INPUT_TYPE=org.graylog2.inputs.gelf.tcp.GELFTCPInput +# GRAYLOG_API_URL=http://graylog:9000 + # LDP storage backend # # You need to generate an API token for your OVH's account and fill the service diff --git a/docker-compose.yml b/docker-compose.yml index 37bbf2b2b..cb183d0b4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -41,6 +41,25 @@ services: environment: KS_SWIFT_PUBLIC_URL: http://127.0.0.1:49177 + graylog: + image: graylog/graylog:4.2.2 + environment: + - GRAYLOG_PASSWORD_SECRET=somepasswordpepper + - GRAYLOG_ROOT_PASSWORD_SHA2=d74ff0ee8da3b9806b18c877dbf29bbde50b5bd8e4dad7a3a725000feb82e8f1 + - GRAYLOG_HTTP_EXTERNAL_URI=http://localhost:9000/ + - GRAYLOG_ROTATION_STRATEGY=count + entrypoint: /usr/bin/tini -- wait-for-it elasticsearch:9200 -- /docker-entrypoint.sh + networks: + default: + depends_on: + - mongo + - elasticsearch + ports: + # Graylog web interface and REST API + - 9000:9000 + # GELF TCP + - 12201:12201 + # -- tools dockerize: image: jwilder/dockerize diff --git a/docs/backends.md b/docs/backends.md index 61fa39b45..b13c8d118 100644 --- a/docs/backends.md +++ b/docs/backends.md @@ -145,3 +145,19 @@ Elasticsearch backend parameters required to connect to a cluster are: > For a complete list of supported `client_options`, please refer to the > [official client's > documentation](https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch). + +## Logging backends + +### Graylog + +The Graylog backend is a log management solution mostly used for storing both structured and +unstructured records of application activities. + +#### Backend parameters + +- `host`: the name of the Docker graylog service (_e.g._`graylog`) +- `port`: the port of the Graylog backend +- `username`: the username of the Web interface admin account (_e.g._`admin`) +- `pass`: the password of the Web interface admin account (_e.g._`pass`) +- `input_title`: the title of the configured input (_e.g._`TCP Input`) +- `input_type`: the type of the configured input (_e.g._`org.graylog2.inputs.gelf.tcp.GELFTCPInput`) diff --git a/src/ralph/backends/logging/__init__.py b/src/ralph/backends/logging/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/src/ralph/backends/logging/base.py b/src/ralph/backends/logging/base.py new file mode 100644 index 000000000..06c9652a1 --- /dev/null +++ b/src/ralph/backends/logging/base.py @@ -0,0 +1,17 @@ +"""Base logging backend for Ralph""" + +from abc import ABC, abstractmethod + + +class BaseLogging(ABC): + """Base logging backend interface.""" + + name = "base" + + @abstractmethod + def get(self, chunk_size=10): + """Read chunk_size records and stream them to stdout.""" + + @abstractmethod + def send(self, chunk_size=10, ignore_errors=False): + """Write chunk_size records from stdin.""" diff --git a/src/ralph/backends/logging/graylog.py b/src/ralph/backends/logging/graylog.py new file mode 100644 index 000000000..a84df5004 --- /dev/null +++ b/src/ralph/backends/logging/graylog.py @@ -0,0 +1,210 @@ +"""Graylog storage backend for Ralph""" + +import json +import logging +import sys +import urllib +from functools import cached_property + +import requests +from logging_gelf.formatters import GELFFormatter +from logging_gelf.handlers import GELFTCPSocketHandler + +from ...defaults import ( + GRAYLOG_ADMIN_PASSWORD, + GRAYLOG_ADMIN_USERNAME, + GRAYLOG_API_URL, + GRAYLOG_HOST, + GRAYLOG_INPUT_TITLE, + GRAYLOG_INPUT_TYPE, + GRAYLOG_PORT, +) +from ..mixins import HistoryMixin +from .base import BaseLogging + +logger = logging.getLogger(__name__) + + +class GraylogAPI: + """Defines Graylog API useful endpoints functions.""" + + def __init__(self, base_url, username, password, headers): + + self.base_url = base_url + self.username = username + self.password = password + self.headers = headers + + @property + def _auth(self): + return (self.username, self.password) + + def get(self, endpoint, params=None): + """Perform Graylog API GET request.""" + + with requests.get( + urllib.parse.urljoin(self.base_url, endpoint), + params=params, + auth=self._auth, + headers=self.headers, + stream=True, + ) as result: + + result.raise_for_status() + + yield result + + def post(self, endpoint, data): + """Perform Graylog API POST request.""" + + with requests.post( + urllib.parse.urljoin(self.base_url, endpoint), + auth=self._auth, + headers=self.headers, + json=data, + ) as result: + result.raise_for_status() + + yield result + + def put(self, endpoint, data): + """Perform Graylog API PUT request.""" + + with requests.put( + urllib.parse.urljoin(self.base_url, endpoint), + auth=self._auth, + headers=self.headers, + json=data, + ) as result: + result.raise_for_status() + + yield result + + def get_node_id(self): + """Returns node id of the Graylog cluster.""" + + return next(iter(json.loads(self.get(endpoint="/api/cluster")))) + + def list_inputs(self): + """Returns list of the created inputs on the Graylog cluster.""" + + return self.get("/api/system/inputs").text + + def create_input(self, data): + """Creates a new input on the Graylog cluster.""" + + return self.post("/api/system/inputs", data=data) + + def input_state(self, input_id): + """Returns identified input with `given_id`.""" + + return self.get(f"/api/system/inputstates/{input_id}").text + + def activate_input(self, input_id): + """Activates a launched input.""" + + return self.put(f"/api/system/inputstates/{input_id}") + + def search_logs(self, params): + """Returns logs matching given `params` parameters.""" + + return self.get("/api/search/universal/relative", params=params) + + +class GraylogLogging(HistoryMixin, BaseLogging): + """Graylog logging backend""" + + # pylint: disable=too-many-arguments, too-many-instance-attributes + + name = "graylog" + + def __init__( + self, + host=GRAYLOG_HOST, + port=GRAYLOG_PORT, + username=GRAYLOG_ADMIN_USERNAME, + password=GRAYLOG_ADMIN_PASSWORD, + title=GRAYLOG_INPUT_TITLE, + input_type=GRAYLOG_INPUT_TYPE, + api_url=GRAYLOG_API_URL, + ): + self.host = host + self.port = port + self.username = username + self.password = password + self.title = title + self.type = input_type + self.api_url = api_url + + self.gelf_logger = logging.getLogger("gelf") + self.gelf_logger.setLevel(logging.INFO) + + self.api = GraylogAPI( + base_url=self.api_url, + username=self.username, + password=self.password, + headers={ + "X-Requested-By": "Ralph Malph", + "Content-Type": "application/json", + }, + ) + + @cached_property + def input_configuration(self): + """Returns configuration of the used input.""" + + return { + "bind_address": self.host, + "max_message_size": 2097152, + "number_worker_threads": 8, + "port": int(self.port), + "recv_buffer_size": 1048576, + "tls_client_auth": "disabled", + "tls_enable": False, + "use_null_delimiter": True, + } + + @cached_property + def input(self): + """Returns input configuration""" + + return { + "node": self.api.get_node_id(), + "configuration": self.input_configuration, + "global": False, + "title": self.title, + "type": self.type, + } + + def send(self, chunk_size=10, ignore_errors=False): + """Send logs in graylog backend (one JSON event per line).""" + + logger.debug("Logging events (chunk size: %d)", chunk_size) + + inputs = json.loads(self.api.list_inputs())["inputs"] + + try: + current_input = next(filter(lambda input: input["title"] == self.title, inputs)) + except StopIteration: + current_input = json.loads(self.api.launch_input(data=self.input)) + self.api.activate_input(input_id=current_input.get("id")) + + handler = GELFTCPSocketHandler(host=self.host, port=self.port) + handler.setFormatter(GELFFormatter()) + self.gelf_logger.addHandler(handler) + + for event in sys.stdin: + self.gelf_logger.info(event) + + def get(self, chunk_size=10): + """Read chunk_size records and stream them to stdout.""" + + logger.debug("Fetching events (chunk_size: %d)", chunk_size) + messages = json.loads(self.api.search_logs(params={"query": "*"}))["messages"] + + events = [message["message"]["message"] for message in messages] + chunks = [events[i : i + chunk_size] for i in range(0, len(events), chunk_size)] + + for chunk in chunks: + for event in chunk: + sys.stdout.buffer.write(bytes(f"{event}" + "\n", encoding="utf-8")) diff --git a/tests/backends/logging/__init__.py b/tests/backends/logging/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/tests/backends/logging/test_base.py b/tests/backends/logging/test_base.py new file mode 100644 index 000000000..fdad0ed54 --- /dev/null +++ b/tests/backends/logging/test_base.py @@ -0,0 +1,22 @@ +"""Tests for Ralph base logging backend""" + +from ralph.backends.logging.base import BaseLogging + + +def test_backends_logging_base_abstract_interface_with_implemented_abstract_method(): + """Tests the interface mechanism with properly implemented abstract methods.""" + + class GoodLogging(BaseLogging): + """Correct implementation with required abstract methods.""" + + name = "good" + + def get(self, chunk_size=10): + """Fakes the get method.""" + + def send(self, chunk_size=10, ignore_errors=False): + """Fakes the send method.""" + + GoodLogging() + + assert GoodLogging.name == "good" diff --git a/tests/backends/logging/test_graylog.py b/tests/backends/logging/test_graylog.py new file mode 100644 index 000000000..926527d6a --- /dev/null +++ b/tests/backends/logging/test_graylog.py @@ -0,0 +1,264 @@ +"""Tests for Ralph graylog storage backend""" + +import json +import sys +from io import StringIO + +import pytest + +from ralph.backends.logging.graylog import GraylogAPI, GraylogLogging +from ralph.defaults import ( + GRAYLOG_ADMIN_PASSWORD, + GRAYLOG_ADMIN_USERNAME, + GRAYLOG_HOST, + GRAYLOG_PORT, +) + + +def test_backends_logging_graylog_logging_instantiation(): + """Tests the GraylogLogging backend instantiation.""" + # pylint: disable=protected-access + + logging = GraylogLogging( + host=GRAYLOG_HOST, + port=GRAYLOG_PORT, + username=GRAYLOG_ADMIN_USERNAME, + password=GRAYLOG_ADMIN_PASSWORD, + ) + + assert logging.name == "graylog" + assert logging.host == "graylog" + assert logging.port == 12201 + + +@pytest.mark.parametrize("base_url", ["http://graylog:9000"]) +@pytest.mark.parametrize("username", ["admin"]) +@pytest.mark.parametrize("password", ["pass"]) +@pytest.mark.parametrize( + "headers", + [ + { + "X-Requested-By": "Ralph Malph", + "Content-Type": "application/json", + } + ], +) +def test_graylog_api_good_instantiation(base_url, username, password, headers): + """Tests the GraylogAPI instantiation.""" + + api = GraylogAPI( + base_url=base_url, username=username, password=password, headers=headers + ) + + assert api.base_url == "http://graylog:9000" + assert api.username == "admin" + assert api.password == "pass" + + +@pytest.mark.parametrize("base_url", ["http://graylog:9000"]) +@pytest.mark.parametrize("username", ["admin"]) +@pytest.mark.parametrize("password", ["pass"]) +@pytest.mark.parametrize( + "headers", + [ + { + "X-Requested-By": "Ralph Malph", + "Content-Type": "application/json", + } + ], +) +def test_graylog_api_get_node_id_method( + monkeypatch, base_url, username, password, headers +): + """Tests that the `get_node_id` method returns the expected node UUID.""" + + api = GraylogAPI( + base_url=base_url, username=username, password=password, headers=headers + ) + + def mock_get(*args, **kwargs): + """Always returns text attributes of a successful get method on '/api/cluster' + endpoint. + """ + # pylint: disable=unused-argument + + return json.dumps({"bc1c7764-5c7c-4cc0-92b9-ec2759ac1fa0": {"text": "foo"}}) + + monkeypatch.setattr(GraylogAPI, "get", mock_get) + result = api.get_node_id() + + assert result == "bc1c7764-5c7c-4cc0-92b9-ec2759ac1fa0" + + +def test_backends_logging_graylog_logging_send_method_should_activate_existing_input( + monkeypatch, +): + """Tests if a Graylog backend correctly activates a configured TCP input.""" + + logging = GraylogLogging( + host=GRAYLOG_HOST, + port=GRAYLOG_PORT, + username=GRAYLOG_ADMIN_USERNAME, + password=GRAYLOG_ADMIN_PASSWORD, + ) + + def mock_get_node_id(*args, **kwargs): + """Always returns a Graylog node id (of UUID type).""" + # pylint: disable=unused-argument + + return "bc1c7764-5c7c-4cc0-92b9-ec2759ac1fa0" + + def mock_list_inputs(*args, **kwargs): + """Returns the list of configured inputs in the case a TCP input has been + configured. + """ + # pylint: disable=unused-argument + + return { + "inputs": [ + { + "title": "TCP input", + "global": False, + "name": "GELF TCP", + "created_at": "2021-11-22T19:29:42.622Z", + "type": "org.graylog2.inputs.gelf.tcp.GELFTCPInput", + "creator_user_id": "admin", + "attributes": { + "bind_address": "graylog", + "max_message_size": 2097152, + "number_worker_threads": 8, + "recv_buffer_size": 1048576, + "tls_client_auth": "disabled", + "tls_enable": False, + "use_null_delimiter": True, + }, + "static_fields": {}, + "node": "bc1c7764-5c7c-4cc0-92b9-ec2759ac1fa0", + "id": "619befa63f959f44ab2ce10a", + } + ], + "total": 1, + } + + def mock_activate_input(*args, **kwargs): + """Always returns an input id.""" + # pylint: disable=unused-argument + + return json.dumps({"id": "619befa63f959f44ab2ce10a"}) + + monkeypatch.setattr( + "sys.stdin", StringIO("\n".join([json.dumps({"id": idx}) for idx in range(10)])) + ) + monkeypatch.setattr(logging.api, "get_node_id", mock_get_node_id) + monkeypatch.setattr(logging.api, "list_inputs", mock_list_inputs) + monkeypatch.setattr(logging.api, "activate_input", mock_activate_input) + + assert ( + logging.api.activate_input( + input_id=logging.api.list_inputs().get("inputs")[0].get("id") + ) + == '{"id": "619befa63f959f44ab2ce10a"}' + ) + + +def test_backends_logging_graylog_logging_send_method_should_launch_input_if_not_exist( + monkeypatch, +): + """Tests logging Graylog backend launches a given configured input if it is not + already configured. + """ + + logging = GraylogLogging( + host=GRAYLOG_HOST, + port=GRAYLOG_PORT, + username=GRAYLOG_ADMIN_USERNAME, + password=GRAYLOG_ADMIN_PASSWORD, + ) + + def mock_get_node_id(*args, **kwargs): + """Always returns a Graylog node id (of UUID type).""" + # pylint: disable=unused-argument + + return "bc1c7764-5c7c-4cc0-92b9-ec2759ac1fa0" + + def mock_list_inputs(*args, **kwargs): + """Returns the list of configured inputs in the case no input has been + configured. + """ + # pylint: disable=unused-argument + + return {"inputs": [], "total": 0} + + def mock_launch_input(*args, **kwargs): + """Returns a dictionnary containing the id of a configured input newly + created. + """ + # pylint: disable=unused-argument + + return {"id": "61a0f59b1d3fab0f365fbba6"} + + def mock_activate_input(input_id, *args, **kwargs): + """Always returns an input id.""" + # pylint: disable=unused-argument + + return json.dumps({"id": input_id}) + + monkeypatch.setattr( + "sys.stdin", StringIO("\n".join([json.dumps({"id": idx}) for idx in range(10)])) + ) + monkeypatch.setattr(logging.api, "get_node_id", mock_get_node_id) + monkeypatch.setattr(logging.api, "list_inputs", mock_list_inputs) + monkeypatch.setattr(logging.api, "launch_input", mock_launch_input) + monkeypatch.setattr(logging.api, "activate_input", mock_activate_input) + + assert ( + logging.api.activate_input( + # pylint: disable=no-value-for-parameter + input_id=logging.api.launch_input().get("id") + ) + == '{"id": "61a0f59b1d3fab0f365fbba6"}' + ) + + +def test_backends_logging_graylog_logging_get_method_should_return_messages_by_chunk( + monkeypatch, +): + """Tests the `get` method of `GraylogLogging` backend returns logged messages.""" + + logging = GraylogLogging( + host=GRAYLOG_HOST, + port=GRAYLOG_PORT, + username=GRAYLOG_ADMIN_USERNAME, + password=GRAYLOG_ADMIN_PASSWORD, + ) + + def mock_search_logs(*args, **kwargs): + """Returns the logged messages.""" + # pylint: disable=unused-argument + + return json.dumps( + { + "messages": [ + {"message": {"message": "message_1"}}, + {"message": {"message": "message_2"}}, + {"message": {"message": "message_3"}}, + {"message": {"message": "message_4"}}, + ] + } + ) + + monkeypatch.setattr(logging.api, "search_logs", mock_search_logs) + + output = [] + + def mock_stdout_write(message, *args, **kwargs): + """Appends a given message to a list each time it is called.""" + # pylint: disable=unused-argument + + return output.append(bytes(message)) + + monkeypatch.setattr(sys.stdout.buffer, "write", mock_stdout_write) + + logging.get(chunk_size=2) + + assert len(output) == len(json.loads(mock_search_logs())["messages"])