Skip to content


Browse files Browse the repository at this point in the history
  • Loading branch information
jnovikov committed Dec 19, 2024
1 parent 3603668 commit d266cf4
Show file tree
Hide file tree
Showing 21 changed files with 1,127 additions and 0 deletions.
152 changes: 152 additions & 0 deletions checkers/docs/
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
#!/usr/bin/env python3
import random
import re
import string
import sys

import checklib
import requests
from checklib import *
from checklib import status

import docs_lib

".AC", ".AD", ".AE", ".AERO", ".AF", ".AG", ".AI", ".AL", ".AM", ".AN", ".AO", ".AQ", ".AR", ".ARPA", ".AS", ".ASIA",
".AT", ".AU", ".AW", ".AX", ".AZ", ".BA", ".BB", ".BD", ".BE", ".BF", ".BG", ".BH", ".BI", ".BIZ", ".BJ", ".BL", ".BM",
".BN", ".BO", ".BR", ".BS", ".BT", ".BV", ".BW", ".BY", ".BZ", ".CA", ".CAT", ".CC", ".CD", ".CF", ".CG", ".CH", ".CI",
".CK", ".CL", ".CM", ".CN", ".CO", ".COM", ".COOP", ".CR", ".CU", ".CV", ".CX", ".CY", ".CZ", ".DE", ".DJ", ".DK", ".DM",
".DO", ".DZ", ".EC", ".EDU", ".EE", ".EG", ".EH", ".ER", ".ES", ".ET", ".EU", ".FI", ".FJ", ".FK", ".FM", ".FO", ".FR",
".GA", ".GB", ".GD", ".GE", ".GF", ".GG", ".GH", ".GI", ".GL", ".GM", ".GN", ".GOV", ".GP", ".GQ", ".GR", ".GS", ".GT",
".GU", ".GW", ".GY", ".HK", ".HM", ".HN", ".HR", ".HT", ".HU", ".ID", ".IE", ".IL", ".IM", ".IN", ".INFO", ".INT", ".IO",
".IQ", ".IR", ".IS", ".IT", ".JE", ".JM", ".JO", ".JOBS", ".JP", ".KE", ".KG", ".KH", ".KI", ".KM", ".KN", ".KP", ".KR",
".KW", ".KY", ".KZ", ".LA", ".LB", ".LC", ".LI", ".LK", ".LR", ".LS", ".LT", ".LU", ".LV", ".LY", ".MA", ".MC", ".MD",
".ME", ".MF", ".MG", ".MH", ".MIL", ".MK", ".ML", ".MM", ".MN", ".MO", ".MOBI", ".MP", ".MQ", ".MR", ".MS", ".MT", ".MU",
".MUSEUM", ".MV", ".MW", ".MX", ".MY", ".MZ", ".NA", ".NAME", ".NC", ".NE", ".NET", ".NF", ".NG", ".NI", ".NL", ".NO",
".NP", ".NR", ".NU", ".NZ", ".OM", ".ORG", ".PA", ".PE", ".PF", ".PG", ".PH", ".PK", ".PL", ".PM", ".PN", ".PR", ".PRO",
".PS", ".PT", ".PW", ".PY", ".QA", ".RE", ".RO", ".RS", ".RU", ".RW", ".SA", ".SB", ".SC", ".SD", ".SE", ".SG", ".SH",
".SI", ".SJ", ".SK", ".SL", ".SM", ".SN", ".SO", ".SR", ".ST", ".SU", ".SV", ".SY", ".SZ", ".TC", ".TD", ".TEL", ".TF",
".TG", ".TH", ".TJ", ".TK", ".TL", ".TM", ".TN", ".TO", ".TP", ".TR", ".TRAVEL", ".TT", ".TV", ".TW", ".TZ", ".UA", ".UG",
".UK", ".UM", ".US", ".UY", ".UZ", ".VA", ".VC", ".VE", ".VG", ".VI", ".VN", ".VU", ".WF", ".WS"

class Checker(BaseChecker):
vulns: int = 1
timeout: int = 15
uses_attack_data: bool = True

def __init__(self, *args, **kwargs):
super(Checker, self).__init__(*args, **kwargs)
self.lib = docs_lib.DocsLib(self)
self.token_regexp = re.compile(r'^[0-9A-Za-z]{1,80}$')

def get_random_org(self):
l = rnd_string(10, alphabet=string.ascii_lowercase)
r = random.choice(LEVEL_1_DOMAINS)
return f"{l}{r}".lower()

def action(self, action, *args, **kwargs):
super(Checker, self).action(action, *args, **kwargs)
except requests.exceptions.ConnectionError:
self.cquit(Status.DOWN, 'Connection error', 'Got requests connection error')

def check(self):
session = checklib.get_initialized_session()
org = self.get_random_org()

response = self.lib.create_org(session, org)
token = response.get('token')
org_id = response.get('id')

self.assert_eq(bool(self.token_regexp.fullmatch(token)), True, 'Invalid token format')

u, p = rnd_username(), rnd_password()
u1, p1 = rnd_username(), rnd_password()
self.lib.create_user(session, u, p, token)
u = f'{u}@{org}'

session = self.lib.login(session, u, p)

title = rnd_string(10)
content = rnd_string(10)

got_doc = self.lib.create_doc(session, title, content)
got_doc = self.lib.get_doc(session, got_doc.get('id'))

self.lib.create_user(session, u1, p1, token)
u1 = f'{u1}@{org}'

session_alter = checklib.get_initialized_session()
self.lib.login(session_alter, u1, p1)

got_alter_doc = self.lib.get_doc(session_alter, got_doc.get('id'))
self.assert_eq(got_alter_doc.get('title'), title, 'Failed to get document')
self.assert_eq(got_alter_doc.get('content'), content, 'Failed to get document')

new_title = rnd_string(10)
self.lib.update_doc(session, got_doc.get('id'), title=new_title)

got_updated_doc = self.lib.get_doc(session, got_doc.get('id'))
self.assert_eq(got_updated_doc.get('title'), new_title, 'Failed to update document')
self.assert_eq(got_updated_doc.get('content'), content, 'Failed to update document')

search_results =, new_title)
self.assert_in(got_updated_doc.get('id'), [x.get('id') for x in search_results], 'Failed to search document')
self.assert_in(got_updated_doc.get('title'), [x.get('title') for x in search_results],
'Failed to search document')
self.assert_in(got_updated_doc.get('content'), [x.get('content') for x in search_results],
'Failed to search document')


def put(self, flag_id: str, flag: str, vuln: str):
session = checklib.get_initialized_session()
org_id = self.get_random_org()

response = self.lib.create_org(session, org_id)
token = response.get('token')

self.assert_eq(bool(self.token_regexp.fullmatch(token)), True, 'Invalid token format')

u, p = rnd_username(), rnd_password()
self.lib.create_user(session, u, p, token)

sess = checklib.get_initialized_session()
u = f'{u}@{org_id}'
self.lib.login(sess, u, p)
title = checklib.rnd_string(10)
created_doc = self.lib.create_doc(sess, title, flag)

doc_id = created_doc.get('id')
self.assert_eq(bool(self.token_regexp.fullmatch(doc_id)), True, 'Invalid docid format')

self.cquit(Status.OK, doc_id, f"{token}:{u}:{p}:{doc_id}")

def get(self, flag_id: str, flag: str, vuln: str):
token, u, p, doc_id = flag_id.split(':')
sess = checklib.get_initialized_session()
self.lib.login(sess, u, p, status=status.Status.CORRUPT)
doc = self.lib.get_doc(sess, doc_id, status=status.Status.CORRUPT)
self.assert_eq(doc.get('content'), flag, 'Invalid content', status=status.Status.CORRUPT)

sess = checklib.get_initialized_session()
u1, p1 = rnd_username(), rnd_password()
created_user = self.lib.create_user(sess, u1, p1, token)

sess = checklib.get_initialized_session()
self.lib.login(sess, created_user.get('email'), created_user.get('password')), '', status=status.Status.CORRUPT)


if __name__ == '__main__':
c = Checker(sys.argv[2])

c.action(sys.argv[1], *sys.argv[3:])
except c.get_check_finished_exception() as e:
cquit(status.Status(c.status), c.public, c.private)
141 changes: 141 additions & 0 deletions checkers/docs/
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
from typing import Optional

import checklib
from checklib import BaseChecker
import requests

PORT = 8000

class DocsLib:
def api_url(self):
return f'http://{}:{self.port}/api'

def __init__(self, checker: BaseChecker, port=PORT, host=None):
self.c = checker
self.port = port = host or

def create_org(self, session: requests.Session, domain: str):
document = {
"domain": domain,

resp =
self.c.assert_eq(resp.status_code, 200, 'Failed to create organization')
response_data = self.c.get_json(resp, 'Failed to create organization: invalid JSON')
self.c.assert_eq(type(response_data), dict, 'Failed to create organization: invalid JSON')
return response_data

def list_orgs(self, session: requests.Session):
resp = session.get(
self.c.assert_eq(resp.status_code, 200, 'Failed to list organization')
return self.c.get_json(resp, 'Failed to list organization: invalid JSON')

def create_user(self, session: requests.Session, username: str, password: str, token: str):
document = {
"username": username,
"password": password,
"token": token
resp =
self.c.assert_eq(resp.status_code, 200, 'Failed to create user')
return self.c.get_json(resp, 'Failed to create user: invalid JSON')

def login(self, session: requests.Session, username: str, password: str, status: checklib.Status = checklib.Status.MUMBLE):
document = {
"email": username,
"password": password

response =
self.c.assert_eq(response.status_code, 200, 'Failed to login', status=status)
resp_json = self.c.get_json(response, 'Failed to login: invalid JSON', status=status)
self.c.assert_eq(type(resp_json), dict, 'Failed to login: invalid JSON', status=status)
token = resp_json.get('token') or ''
session.headers['Authorization'] = f"Bearer {token}"
return session

def get_user(self, session: requests.Session, status: checklib.Status = checklib.Status.MUMBLE):
response = session.get(

self.c.assert_eq(response.status_code, 200, 'Failed to get user', status=status)
return self.c.get_json(response, 'Failed to get user: invalid JSON', status=status)

def create_doc(self, session: requests.Session, title: str, content: str, status: checklib.Status = checklib.Status.MUMBLE):
document = {
"title": title,
"content": content

response =
self.c.assert_eq(response.status_code, 200, 'Failed to create document', status=status)
return self.c.get_json(response, 'Failed to create document: invalid JSON', status=status)

def update_doc(self, session: requests.Session, doc_id:str, title: str | None, content: str | None = None,
status: checklib.Status = checklib.Status.MUMBLE):
document = {}
if title:
document['title'] = title
if content:
document['content'] = content

response = session.patch(
self.c.assert_eq(response.status_code, 200, 'Failed to update document', status=status)
return self.c.get_json(response, 'Failed to create document: invalid JSON', status=status)

def get_doc(self, session: requests.Session, doc_id: str, status: checklib.Status = checklib.Status.MUMBLE):
response = session.get(
self.c.assert_eq(response.status_code, 200, 'Failed to get document', status=status)
return self.c.get_json(response, 'Failed to get document: invalid JSON', status=status)

def delete_doc(self, session: requests.Session, doc_id: str, status: checklib.Status = checklib.Status.MUMBLE):
response = session.delete(
self.c.assert_eq(response.status_code, 200, 'Failed to delete document', status=status)

def search(self, session: requests.Session, query: str, status: checklib.Status = checklib.Status.MUMBLE):
response = session.get(f"{self.api_url}/documents",
params={'query': query}
self.c.assert_eq(response.status_code, 200, 'Failed to search', status=status)
return self.c.get_json(response, 'Failed to search: invalid JSON', status=status)

def document_get_txt(self, session: requests.Session, doc_id: str, status: checklib.Status = checklib.Status.MUMBLE):
response = session.get(
self.c.assert_eq(response.status_code, 200, 'Failed to get txt', status=status)
return response.text

1 change: 1 addition & 0 deletions services/docs/api/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
15 changes: 15 additions & 0 deletions services/docs/api/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM python:3.12-slim-bullseye

RUN apt update && apt install -y xxd
ADD src/requirements.txt requirements.txt

RUN pip3 install -r requirements.txt

COPY app.env app.env

RUN sed -i "s/JWT_KEY=.*/JWT_KEY=$(xxd -u -l 20 -p /dev/urandom)/g" app.env

COPY src .

CMD fastapi run app/
3 changes: 3 additions & 0 deletions services/docs/api/app.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Empty file.

0 comments on commit d266cf4

Please sign in to comment.