Skip to content

Commit

Permalink
✨(backends) add graylog service
Browse files Browse the repository at this point in the history
LDP backends is available for tests.But until now, no service was
available in the project. We have implemented a graylog service for it.
  • Loading branch information
quitterie-lcs committed Mar 24, 2022
1 parent dae895f commit aa1eda6
Show file tree
Hide file tree
Showing 17 changed files with 626 additions and 11 deletions.
10 changes: 10 additions & 0 deletions .env.dist
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,16 @@ RALPH_APP_DIR=/app/.ralph
# Uncomment lines (by removing # characters at the beginning of target lines)
# to define environment variables associated to the backend(s) you need.

# Graylog storage backend

# RALPH_GRAYLOG_HOST=graylog
# RALPH_GRAYLOG_PORT=12201
# RALPH_GRAYLOG_ADMIN_USERNAME=admin
# GRAYLOG_ADMIN_PASSWORD=pass
# GRAYLOG_INPUT_TITLE=TCPInput
# GRAYLOG_INPUT_TYPE=org.graylog2.inputs.gelf.tcp.GELFTCPInput
# GRAYLOG_API_URL=http://graylog:9000

# LDP storage backend
#
# You need to generate an API token for your OVH's account and fill the service
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ and this project adheres to

### Added

- `graylog` logging backend
- Implement edx problem interaction events pydantic models
- Implement edx textbook interaction events pydantic models
- `ws` websocket stream backend (compatible with the `fetch` command)
Expand Down
11 changes: 10 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ run: \
run-all: ## start all supported local backends
run-all: \
run-es \
run-graylog \
run-swift
.PHONY: run-all

Expand All @@ -123,6 +124,14 @@ run-es: ## start elasticsearch backend
@$(COMPOSE_RUN) dockerize -wait tcp://elasticsearch:9200 -timeout 60s
.PHONY: run-es

run-graylog: ## start graylog backend
@$(COMPOSE) up -d graylog
@echo "Waiting for graylog to be up and running..."
@$(COMPOSE_RUN) dockerize -wait tcp://mongo:27017 -timeout 60s
@$(COMPOSE_RUN) dockerize -wait tcp://elasticsearch:9200 -timeout 60s
@$(COMPOSE_RUN) dockerize -wait tcp://graylog:9000 -timeout 60s
.PHONY: run-graylog

run-swift: ## start swift backend
@$(COMPOSE) up -d swift
@echo "Waiting for swift to be up and running..."
Expand All @@ -138,7 +147,7 @@ stop: ## stops backend servers
.PHONY: stop

test: ## run back-end tests
test: run-es
test: run-es run-graylog
bin/pytest
.PHONY: test

Expand Down
30 changes: 28 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
version: '3.4'
version: "3.4"

services:
app:
Expand All @@ -17,7 +17,9 @@ services:
- "${RALPH_RUNSERVER_PORT:-8100}:${RALPH_RUNSERVER_PORT:-8100}"
volumes:
- .:/app

networks:
default:

# -- backends
elasticsearch:
image: elasticsearch:7.16.3
Expand All @@ -40,6 +42,30 @@ services:
environment:
KS_SWIFT_PUBLIC_URL: http://127.0.0.1:49177

mongo:
image: mongo:4.2
networks:
default:

graylog:
image: graylog/graylog:4.2.2
environment:
- GRAYLOG_PASSWORD_SECRET=somepasswordpepper
- GRAYLOG_ROOT_PASSWORD_SHA2=d74ff0ee8da3b9806b18c877dbf29bbde50b5bd8e4dad7a3a725000feb82e8f1
- GRAYLOG_HTTP_EXTERNAL_URI=http://localhost:9000/
- GRAYLOG_ROTATION_STRATEGY=count
entrypoint: /usr/bin/tini -- wait-for-it elasticsearch:9200 -- /docker-entrypoint.sh
networks:
default:
depends_on:
- mongo
- elasticsearch
ports:
# Graylog web interface and REST API
- 9000:9000
# GELF TCP
- 12201:12201

# -- tools
dockerize:
image: jwilder/dockerize
Expand Down
16 changes: 16 additions & 0 deletions docs/backends.md
Original file line number Diff line number Diff line change
Expand Up @@ -143,3 +143,19 @@ Elasticsearch backend parameters required to connect to a cluster are:
> For a complete list of supported `client_options`, please refer to the
> [official client's
> documentation](https://elasticsearch-py.readthedocs.io/en/latest/api.html#elasticsearch).
## Logging backends

### Graylog

The Graylog backend is a log management solution mostly used for storing both structured and
unstructured records of application activities.

#### Backend parameters

- `host`: the name of the Docker graylog service (_e.g._`graylog`)
- `port`: the port of the Graylog backend
- `username`: the username of the Web interface admin account (_e.g._`admin`)
- `pass`: the password of the Web interface admin account (_e.g._`pass`)
- `input_title`: the title of the configured input (_e.g._`TCP Input`)
- `input_type`: the type of the configured input (_e.g._`org.graylog2.inputs.gelf.tcp.GELFTCPInput`)
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ skip_glob=venv
profile=black

[tool:pytest]
addopts = -v --cov-report term-missing --cov-config=.coveragerc --cov=src/ralph --hypothesis-show-statistics
addopts = -v --cov-report term-missing --cov-config=.coveragerc --cov=src/ralph
python_files =
test_*.py
tests.py
Expand Down
1 change: 1 addition & 0 deletions src/ralph/backends/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,6 @@ class BackendTypes(Enum):
"""Backend types"""

DATABASE = auto()
LOGGING = auto()
STORAGE = auto()
STREAM = auto()
Empty file.
17 changes: 17 additions & 0 deletions src/ralph/backends/logging/base.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
"""Base logging backend for Ralph"""

from abc import ABC, abstractmethod


class BaseLogging(ABC):
"""Base logging backend interface."""

name = "base"

@abstractmethod
def get(self, chunk_size=10):
"""Read chunk_size records and stream them to stdout."""

@abstractmethod
def send(self, chunk_size=10, ignore_errors=False):
"""Write chunk_size records from stdin."""
210 changes: 210 additions & 0 deletions src/ralph/backends/logging/graylog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
"""Graylog storage backend for Ralph"""

import json
import logging
import sys
import urllib
from functools import cached_property

import requests
from logging_gelf.formatters import GELFFormatter
from logging_gelf.handlers import GELFTCPSocketHandler

from ...defaults import (
GRAYLOG_ADMIN_PASSWORD,
GRAYLOG_ADMIN_USERNAME,
GRAYLOG_API_URL,
GRAYLOG_HOST,
GRAYLOG_INPUT_TITLE,
GRAYLOG_INPUT_TYPE,
GRAYLOG_PORT,
)
from ..mixins import HistoryMixin
from .base import BaseLogging

logger = logging.getLogger(__name__)


class GraylogAPI:
"""Defines Graylog API useful endpoints functions."""

def __init__(self, base_url, username, password, headers):

self.base_url = base_url
self.username = username
self.password = password
self.headers = headers

@property
def _auth(self):
return (self.username, self.password)

def get(self, endpoint, params=None):
"""GET method."""

with requests.get(
urllib.parse.urljoin(self.base_url, endpoint),
params=params,
auth=self._auth,
headers=self.headers,
stream=True,
) as result:

result.raise_for_status()

return result

def post(self, endpoint, data):
"""POST method."""

with requests.post(
urllib.parse.urljoin(self.base_url, endpoint),
json=data,
auth=self._auth,
headers=self.headers,
) as result:
result.raise_for_status()

return result.text

def put(self, endpoint):
"""PUT method."""

with requests.put(
urllib.parse.urljoin(self.base_url, endpoint),
auth=self._auth,
headers=self.headers,
) as result:
result.raise_for_status()

return result

def get_node_id(self):
"""Returns node id of the Graylog cluster."""

return next(iter(json.loads(self.get(endpoint="/api/cluster"))))

def list_inputs(self):
"""Returns list of the created inputs on the Graylog cluster."""

return self.get("/api/system/inputs").text

def launch_input(self, data):
"""Launches a new input on the Graylog cluster."""

return self.post("/api/system/inputs", data=data)

def input_state(self, input_id):
"""Returns identified input with `given_id`."""

return self.get(f"/api/system/inputstates/{input_id}").text

def activate_input(self, input_id):
"""Activates a launched input."""

return self.put(f"/api/system/inputstates/{input_id}")

def search_logs(self, params):
"""Returns logs matching given `params` parameters."""

return self.get("/api/search/universal/relative", params=params)


class GraylogLogging(HistoryMixin, BaseLogging):
"""Graylog logging backend"""

# pylint: disable=too-many-arguments, too-many-instance-attributes

name = "graylog"

def __init__(
self,
host=GRAYLOG_HOST,
port=GRAYLOG_PORT,
username=GRAYLOG_ADMIN_USERNAME,
password=GRAYLOG_ADMIN_PASSWORD,
title=GRAYLOG_INPUT_TITLE,
input_type=GRAYLOG_INPUT_TYPE,
api_url=GRAYLOG_API_URL,
):
self.host = host
self.port = port
self.username = username
self.password = password
self.title = title
self.type = input_type
self.api_url = api_url

self.gelf_logger = logging.getLogger("gelf")
self.gelf_logger.setLevel(logging.INFO)

self.api = GraylogAPI(
base_url=self.api_url,
username=self.username,
password=self.password,
headers={
"X-Requested-By": "Ralph Malph",
"Content-Type": "application/json",
},
)

@cached_property
def input_configuration(self):
"""Returns configuration of the used input."""

return {
"bind_address": self.host,
"max_message_size": 2097152,
"number_worker_threads": 8,
"port": int(self.port),
"recv_buffer_size": 1048576,
"tls_client_auth": "disabled",
"tls_enable": False,
"use_null_delimiter": True,
}

@cached_property
def input(self):
"""Returns input configuration"""

return {
"node": self.api.get_node_id(),
"configuration": self.input_configuration,
"global": False,
"title": self.title,
"type": self.type,
}

def send(self, chunk_size=10, ignore_errors=False):
"""Send logs in graylog backend (one JSON event per line)."""

logger.debug("Logging events (chunk size: %d)", chunk_size)

inputs = json.loads(self.api.list_inputs())["inputs"]
title = self.input["title"]

try:
current_input = next(filter(lambda i: i["title"] == title, inputs))
except StopIteration:
current_input = json.loads(self.api.launch_input(data=self.input))
self.api.activate_input(input_id=current_input.get("id"))

handler = GELFTCPSocketHandler(host=self.host, port=self.port)
handler.setFormatter(GELFFormatter())
self.gelf_logger.addHandler(handler)

for event in sys.stdin:
self.gelf_logger.info(event)

def get(self, chunk_size=10):
"""Read chunk_size records and stream them to stdout."""

logger.debug("Fetching events (chunk_size: %d)", chunk_size)
messages = json.loads(self.api.search_logs(params={"query": "*"}))["messages"]

events = [message["message"]["message"] for message in messages]
chunks = [events[i : i + chunk_size] for i in range(0, len(events), chunk_size)]

for chunk in chunks:
for event in chunk:
sys.stdout.buffer.write(bytes(f"{event}" + "\n", encoding="utf-8"))
Loading

0 comments on commit aa1eda6

Please sign in to comment.