Skip to content

Commit

Permalink
🥅 Catch HAProxy errors, Better HAProxy config
Browse files Browse the repository at this point in the history
Signed-off-by: Muhammed Hussein Karimi <[email protected]>
  • Loading branch information
mhkarimi1383 committed Oct 29, 2024
1 parent e611e74 commit 55d920c
Show file tree
Hide file tree
Showing 5 changed files with 111 additions and 80 deletions.
7 changes: 5 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ frontend redis_master
default_backend redis_master
backend redis_master
balance source
hash-type consistent
```

Is enough
Expand All @@ -35,7 +37,7 @@ To run this project install `poetry` (and run `poetry install`) or use `direnv`
[env var: `SENTINEL_PORT`]
[default: `26379`]

- `--sentinel-password` `-p` `TEXT`
- `--sentinel-password` `-P` `TEXT`
Sentinel Password
[env var: `SENTINEL_PASSWORD`]
[default: `None`]
Expand Down Expand Up @@ -81,4 +83,5 @@ With the help of Redis Pub/Sub we are able to understand master changes live tha

## TODO

* [ ] Add error handling for HAProxy commands
* [x] Add error handling for HAProxy commands

2 changes: 2 additions & 0 deletions haproxy.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ frontend redis_master
default_backend redis_master

backend redis_master
balance source
hash-type consistent
114 changes: 36 additions & 78 deletions haproxy_redis_sentinel/cli.py
Original file line number Diff line number Diff line change
@@ -1,79 +1,22 @@
import typer
from rich.console import Console
from datetime import datetime
import redis
from redis.retry import Retry
from enum import StrEnum
from haproxy_redis_sentinel.logging import info
from haproxy_redis_sentinel.utils import send_command
from redis.backoff import ExponentialBackoff
from redis.retry import Retry
from typing import Annotated, Any
import socket
import redis
import typer


__all__ = ["app"]

app = typer.Typer()

err_console = Console(stderr=True)
out_console = Console(stderr=False)


def encode_command(command: str) -> bytes:
return f"{command};\n".encode("utf-8")


def log_prefix() -> str:
return f"{datetime.now().strftime("%c")} "


def info(msg):
typer.echo(
log_prefix() + typer.style(
msg,
fg=typer.colors.GREEN,
bold=True
),
err=False
)


def error(msg: str):
typer.echo(
log_prefix() + typer.style(
msg,
fg=typer.colors.WHITE,
bg=typer.colors.RED,
bold=True
),
err=True
)


def recvall(sock: socket.socket):
BUFF_SIZE = 1024
data = b''
while True:
part = sock.recv(BUFF_SIZE)
data += part
print(part)
if len(part) < BUFF_SIZE:
# either 0 or end of data
break
return data


def send_command(addr: str, command: str) -> str:
if len(addr.split(":")) == 2:
family = socket.AF_INET
addr_parts = addr.split(":")
a = (addr_parts[0], int(addr_parts[1]))
else:
a = addr
family = socket.AF_UNIX
unix_socket = socket.socket(family, socket.SOCK_STREAM)
unix_socket.settimeout(10)
unix_socket.connect(a)

unix_socket.send(encode_command(command))

return recvall(unix_socket).decode("utf-8")
class HAProxyOutput(StrEnum):
SERVER_REGISTERED = "New server registered."
SERVER_DELETED = "Server deleted."
SERVER_NOT_FOUND = "No such server."


@app.command()
Expand All @@ -100,7 +43,7 @@ def run(
str | None,
typer.Option(
"--sentinel-password",
"-p",
"-P",
help="Sentinel Password",
envvar="SENTINEL_PASSWORD",
hide_input=True,
Expand Down Expand Up @@ -149,18 +92,31 @@ def run(
)
address = None
sentinel_info: dict[str, Any] = conn.info() # type: ignore
for k in sentinel_info.keys():
if k.startswith("master") and sentinel_info[k]["name"] == master_name:
address = sentinel_info[k]["address"]
info(address)
info(send_command(haproxy_socket, f"del server {
haproxy_backend}/{haproxy_server_name}"))
info(send_command(haproxy_socket,
f"add server {haproxy_backend}/{haproxy_server_name} {address}"))
try:
master_id = [k for k in sentinel_info.keys()
if k.startswith("master") and
sentinel_info[k]["name"] == master_name][1]
except IndexError:
raise Exception("Unable to find given master by name")
address = sentinel_info[master_id]["address"]
info(f"Setting initial master address: {address}")

# Remove server in case of restarts
out = send_command(haproxy_socket, f"del server {
haproxy_backend}/{haproxy_server_name}")
if out not in {HAProxyOutput.SERVER_DELETED,
HAProxyOutput.SERVER_NOT_FOUND}:
raise Exception(f"Error while removing old server: {out}")
out = send_command(haproxy_socket,
f"add server {haproxy_backend}/{haproxy_server_name} {address}") # noqa: E501
if out != HAProxyOutput.SERVER_REGISTERED:
raise Exception(f"Error while adding initial server: {out}")
info(out)
pubsub = conn.pubsub()
pubsub.subscribe("+switch-master")
for message in pubsub.listen():
if not isinstance(message["data"], str):
info("Skipping initial message in Pub/Sub")
continue
data: list[str] = message["data"].split(" ")
if data[0] != master_name:
Expand All @@ -169,8 +125,10 @@ def run(
host = data[3]
port = data[4]
info("Master Changed, Terminating clients")
info(send_command(haproxy_socket,
"set server redis_master/current_master state maint"))
info(send_command(haproxy_socket,
"shutdown sessions server redis_master/current_master"))
info(f"Switching to new master Host: {host}, Port: {port}")
info(send_command(haproxy_socket,
f"set {haproxy_backend}/{haproxy_server_name} addr {host} port {port}")) # noqa: E501
f"set server {haproxy_backend}/{haproxy_server_name} addr {host} port {port}")) # noqa: E501
32 changes: 32 additions & 0 deletions haproxy_redis_sentinel/logging.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
from datetime import datetime
import typer


def log_prefix() -> str:
return f"{datetime.now().strftime("%c")} "


def info(msg):
typer.echo(
log_prefix() + typer.style(
msg,
fg=typer.colors.GREEN,
bold=True
),
err=False
)


def error(msg: str):
typer.echo(
log_prefix() + typer.style(
msg,
fg=typer.colors.WHITE,
bg=typer.colors.RED,
bold=True
),
err=True
)


__all__ = ["info", "error"]
36 changes: 36 additions & 0 deletions haproxy_redis_sentinel/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
import socket

__all__ = ["send_command"]


def encode_command(command: str) -> bytes:
return f"{command};\n".encode("utf-8")


def recvall(sock: socket.socket):
BUFF_SIZE = 1024
data = b''
while True:
part = sock.recv(BUFF_SIZE)
data += part
if len(part) < BUFF_SIZE:
# either 0 or end of data
break
return data


def send_command(addr: str, command: str) -> str:
if len(addr.split(":")) == 2:
addr_parts = addr.split(":")
a = (addr_parts[0], int(addr_parts[1]))
family = socket.AF_INET
else:
a = addr
family = socket.AF_UNIX
unix_socket = socket.socket(family, socket.SOCK_STREAM)
unix_socket.settimeout(10)
unix_socket.connect(a)

unix_socket.send(encode_command(command))

return recvall(unix_socket).decode("utf-8").strip()

0 comments on commit 55d920c

Please sign in to comment.