Skip to content

Commit

Permalink
fixup! ✨(backends) add graylog service
Browse files Browse the repository at this point in the history
  • Loading branch information
quitterie-lcs committed Nov 22, 2021
1 parent 6eb5459 commit b7a07c6
Show file tree
Hide file tree
Showing 5 changed files with 163 additions and 13 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ and this project adheres to

### Added

- Implement `graylog` backend
- `graylog` logging backend
- Implement edx problem interaction events pydantic models
- Implement edx textbook interaction events pydantic models
- `ws` websocket stream backend (compatible with the `fetch` command)
Expand Down
8 changes: 6 additions & 2 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,9 @@ services:
PYLINTHOME: /app/.pylint.d
volumes:
- .:/app

networks:
default:

# -- backends
elasticsearch:
image: elasticsearch:7.13.3
Expand Down Expand Up @@ -43,8 +45,10 @@ services:
default:

graylog:
image: graylog/graylog:4.0
image: graylog/graylog:4.2.1
environment:
- GRAYLOG_PASSWORD_SECRET=somepasswordpepper
- GRAYLOG_ROOT_PASSWORD_SHA2=d74ff0ee8da3b9806b18c877dbf29bbde50b5bd8e4dad7a3a725000feb82e8f1
- GRAYLOG_HTTP_EXTERNAL_URI=http://localhost:9000/
entrypoint: /usr/bin/tini -- wait-for-it elasticsearch:9200 -- /docker-entrypoint.sh
networks:
Expand Down
151 changes: 142 additions & 9 deletions src/ralph/backends/logging/graylog.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,105 @@
"""Graylog storage backend for Ralph"""

from itertools import zip_longest
import json
import logging
import sys
from itertools import zip_longest

import requests
from logging_gelf.formatters import GELFFormatter
from logging_gelf.handlers import GELFTCPSocketHandler

from ...defaults import RALPH_GRAYLOG_HOST, RALPH_GRAYLOG_PORT
from ...defaults import (
RALPH_GRAYLOG_ADMIN_PASSWORD,
RALPH_GRAYLOG_ADMIN_USERNAME,
RALPH_GRAYLOG_HOST,
RALPH_GRAYLOG_PORT,
RALPH_GRAYLOG_EXTERNAL_PORT,
)
from ..mixins import HistoryMixin
from .base import BaseLogging

logger = logging.getLogger(__name__)


class GraylogAPI:
""" """

def __init__(self, url, username, password, headers):

self.url = url
self.username = username
self.password = password
self.headers = headers

@property
def _auth(self):
return (self.username, self.password)

def get(self, endpoint, params=None):
"""GET method."""

with requests.get(
f"{self.url}{endpoint}",
params=params,
auth=self._auth,
headers=self.headers,
) as result:

result.raise_for_status()

return result.text

def post(self, endpoint, json):
"""POST method."""

with requests.post(
f"{self.url}{endpoint}", json=json, auth=self._auth, headers=self.headers
) as result:
result.raise_for_status()

return result.text

def put(self, endpoint):
"""PUT method."""

with requests.put(
f"{self.url}{endpoint}", auth=self._auth, headers=self.headers
) as result:
result.raise_for_status()

return result

def get_node_id(self):
"""Returns node id of the Graylog cluster."""

return next(iter(json.loads(self.get(endpoint="/api/cluster"))))

def list_inputs(self):
"""Returns list of the created inputs on the Graylog cluster."""

return self.get("/api/system/inputs")

def launch_input(self, json):
"""Launches a new input on the Graylog cluster."""

return self.post("/api/system/inputs", json=json)

def input_state(self, input_id):
""" """

return self.get(f"/api/system/inputstates/{input_id}")

def activate_input(self, input_id):
"""Activates a launched input."""

return self.put(f"/api/system/inputstates/{input_id}")

def search_logs(self, params):

return self.get(f"/api/search/universal/relative", params=params)


class GraylogLogging(HistoryMixin, BaseLogging):
"""Graylog logging backend"""

Expand All @@ -25,24 +111,68 @@ def __init__(
self,
host=RALPH_GRAYLOG_HOST,
port=RALPH_GRAYLOG_PORT,
client_options=None,
external_port=RALPH_GRAYLOG_EXTERNAL_PORT,
username=RALPH_GRAYLOG_ADMIN_USERNAME,
password=RALPH_GRAYLOG_ADMIN_PASSWORD,
):
if client_options is None:
client_options = {}

self.host = host
self.port = port
self.external_port = external_port
self.username = username
self.password = password

self.gelf_logger = logging.getLogger("gelf")
self.gelf_logger.setLevel(logging.INFO)

self.api = GraylogAPI(
url=f"http://{self.host}:{self.external_port}",
username=self.username,
password=self.password,
headers={
"X-Requested-By": "Learning Analytics Playground",
"Content-Type": "application/json",
},
)

@property
def input_configuration(self):
""" """

return {
"node": self.api.get_node_id(),
"configuration": {
"bind_address": self.host,
"port": int(self.port),
"tls_enable": False,
},
"global": False,
"title": "TCP input",
"type": "org.graylog2.inputs.gelf.tcp.GELFTCPInput",
}

def check_input_exists(self, inputs, title):
"""Returns True if a given input has already been created in the Graylog cluster."""

for input in inputs:
if input["title"] == title:
return True

return False

def send(self, chunk_size, ignore_errors=False):
"""Send logs in graylog backend (one JSON event per line)."""

logger.debug("Logging events (chunk size: %d)", chunk_size)

chunks = zip_longest(*([iter(sys.stdin.readlines())] * chunk_size))

if not self.check_input_exists(
self.api.list_inputs()["inputs"], title=self.input_configuration["title"]
):
self.api.launch_input(json=self.input_configuration)

self.api.activate_input()

handler = GELFTCPSocketHandler(host=self.host, port=self.port)
handler.setFormatter(GELFFormatter())
self.gelf_logger.addHandler(handler)
Expand All @@ -54,6 +184,9 @@ def send(self, chunk_size, ignore_errors=False):
def get(self, chunk_size=10):
"""Read chunk_size records and stream them to stdout."""

msg = "Graylog storage backend is write-only, cannot read from"
logger.error(msg)
raise NotImplementedError(msg)
logger.debug("Fetching events (chunk_size: %d)", chunk_size)

messages = json.loads(self.api.search_logs(params={"query": "*"}))["messages"]

for message in messages:
sys.stdout.write(message["message"]["message"] + "\n")
3 changes: 3 additions & 0 deletions src/ralph/defaults.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,3 +138,6 @@ def load_config(config_file_path):
EXECUTION_ENVIRONMENT = config("RALPH_EXECUTION_ENVIRONMENT", "development")
RALPH_GRAYLOG_HOST = config("RALPH_GRAYLOG_HOST", "graylog")
RALPH_GRAYLOG_PORT = config("RALPH_GRAYLOG_PORT", 12201)
RALPH_GRAYLOG_EXTERNAL_PORT = config("RALPH_GRAYLOG_EXTERNAL_PORT", 9000)
RALPH_GRAYLOG_ADMIN_USERNAME = config("RALPH_GRAYLOG_ADMIN_USERNAME", "admin")
RALPH_GRAYLOG_ADMIN_PASSWORD = config("RALPH_GRAYLOG_ADMIN_PASSWORD", "pass")
12 changes: 11 additions & 1 deletion tests/backends/logging/test_graylog.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
"""Tests for Ralph graylog storage backend"""

from ralph.backends.logging.graylog import GraylogLogging
from ralph.defaults import RALPH_GRAYLOG_HOST, RALPH_GRAYLOG_PORT
from ralph.defaults import (
RALPH_GRAYLOG_ADMIN_PASSWORD,
RALPH_GRAYLOG_ADMIN_USERNAME,
RALPH_GRAYLOG_EXTERNAL_PORT,
RALPH_GRAYLOG_HOST,
RALPH_GRAYLOG_PORT,
)


def test_backends_logging_graylog_logging_instantiation():
Expand All @@ -11,8 +17,12 @@ def test_backends_logging_graylog_logging_instantiation():
storage = GraylogLogging(
host=RALPH_GRAYLOG_HOST,
port=RALPH_GRAYLOG_PORT,
external_port=RALPH_GRAYLOG_EXTERNAL_PORT,
username=RALPH_GRAYLOG_ADMIN_USERNAME,
password=RALPH_GRAYLOG_ADMIN_PASSWORD,
)

assert storage.name == "graylog"
assert storage.host == "graylog"
assert storage.port == 12201
assert storage.external_port == 9000

0 comments on commit b7a07c6

Please sign in to comment.