From 16ff52439a3c22b48203898e18b0956eccc117aa Mon Sep 17 00:00:00 2001 From: YishaiGlasner Date: Tue, 23 Jul 2024 14:41:02 +0300 Subject: [PATCH 01/24] feat(API response): new celeryResponse. --- sefaria/client/util.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/sefaria/client/util.py b/sefaria/client/util.py index 38a4014fad..4878cbca8f 100644 --- a/sefaria/client/util.py +++ b/sefaria/client/util.py @@ -39,6 +39,9 @@ def jsonpResponse(data, callback, status=200): return HttpResponse("%s(%s)" % (callback, json.dumps(data, ensure_ascii=False)), content_type="application/javascript; charset=utf-8", charset="utf-8", status=status) +def celeryResponse(task_ids): + return jsonResponse({'task_ids': task_ids}, status=202) + def send_email(subject, message_html, from_email, to_email): msg = EmailMultiAlternatives(subject, message_html, "Sefaria ", [to_email], reply_to=[from_email]) msg.send() From 4d8fcb67d20a7bc1cdf6fd28297f0ea9718e6342 Mon Sep 17 00:00:00 2001 From: YishaiGlasner Date: Tue, 23 Jul 2024 14:44:16 +0300 Subject: [PATCH 02/24] feat(post links API): post links with celery. --- reader/views.py | 58 ++++++------------- sefaria/helper/texts/tasks.py | 40 +++++++++++++ .../sefaria_tasks_interace/history_change.py | 10 ++++ 3 files changed, 68 insertions(+), 40 deletions(-) create mode 100644 sefaria/helper/texts/tasks.py create mode 100644 sefaria/sefaria_tasks_interace/history_change.py diff --git a/reader/views.py b/reader/views.py index 8c3bf05091..0433e3da8a 100644 --- a/reader/views.py +++ b/reader/views.py @@ -20,7 +20,7 @@ from rest_framework.permissions import IsAuthenticated from django.template.loader import render_to_string from django.shortcuts import render, redirect -from django.http import Http404, QueryDict +from django.http import Http404, QueryDict, HttpResponse from django.contrib.auth.decorators import login_required from django.contrib.admin.views.decorators import staff_member_required from django.utils.encoding import iri_to_uri @@ -42,7 +42,7 @@ from sefaria.model.following import general_follow_recommendations from sefaria.model.trend import user_stats_data, site_stats_data from sefaria.client.wrapper import format_object_for_client, format_note_object_for_client, get_notes, get_links -from sefaria.client.util import jsonResponse +from sefaria.client.util import jsonResponse, celeryResponse from sefaria.history import text_history, get_maximal_collapsed_activity, top_contributors, text_at_revision, record_version_deletion, record_index_deletion from sefaria.sheets import get_sheets_for_ref, get_sheet_for_panel, annotate_user_links, trending_topics from sefaria.utils.util import text_preview, short_to_long_lang_code, epoch_time @@ -80,6 +80,7 @@ from babel import Locale from sefaria.helper.topic import update_topic, update_topic_titles from sefaria.helper.category import update_order_of_category_children, check_term +from sefaria.helper.texts.tasks import defer_save_link if USE_VARNISH: from sefaria.system.varnish.wrapper import invalidate_ref, invalidate_linked @@ -1921,18 +1922,17 @@ def links_api(request, link_id_or_ref=None): #TODO: can we distinguish between a link_id (mongo id) for POSTs and a ref for GETs? """ - def _internal_do_post(request, link, uid, **kwargs): - func = tracker.update if "_id" in link else tracker.add - # use the correct function if params indicate this is a note save - # func = save_note if "type" in j and j["type"] == "note" else save_link - #obj = func(apikey["uid"], model.Link, link, **kwargs) - obj = func(uid, Link, link, **kwargs) - try: - if USE_VARNISH: - revarnish_link(obj) - except Exception as e: - logger.error(e) - return format_object_for_client(obj) + def _internal_do_post(request, obj, uid, method, skip_check, override_preciselink): + task_ids = [] + if isinstance(obj, dict): + obj = [obj] + for link in obj: + if skip_check: + link["_skip_lang_check"] = True + if override_preciselink: + link["_override_preciselink"] = True + task_ids.append(defer_save_link(uid, link, method).id) + return task_ids def _internal_do_delete(request, link_id_or_ref, uid): obj = tracker.delete(uid, Link, link_id_or_ref, callback=revarnish_link) @@ -1958,12 +1958,12 @@ def _internal_do_delete(request, link_id_or_ref, uid): if not apikey: return jsonResponse({"error": "Unrecognized API key."}) uid = apikey["uid"] - kwargs = {"method": "API"} + method = "API" user = User.objects.get(id=apikey["uid"]) else: user = request.user uid = request.user.id - kwargs = {} + method = None _internal_do_post = csrf_protect(_internal_do_post) _internal_do_delete = staff_member_required(csrf_protect(_internal_do_delete)) @@ -1975,30 +1975,8 @@ def _internal_do_delete(request, link_id_or_ref, uid): j = json.loads(j) skip_check = request.GET.get("skip_lang_check", 0) override_preciselink = request.GET.get("override_preciselink", 0) - if isinstance(j, list): - res = [] - for i in j: - try: - if skip_check: - i["_skip_lang_check"] = True - if override_preciselink: - i["_override_preciselink"] = True - retval = _internal_do_post(request, i, uid, **kwargs) - res.append({"status": "ok. Link: {} | {} Saved".format(retval["ref"], retval["anchorRef"])}) - except Exception as e: - res.append({"error": "Link: {} | {} Error: {}".format(i["refs"][0], i["refs"][1], str(e))}) - - try: - res_slice = request.GET.get("truncate_response", None) - if res_slice: - res_slice = int(res_slice) - except Exception as e: - res_slice = None - return jsonResponse(res[:res_slice]) - else: - if skip_check: - j["_skip_lang_check"] = True - return jsonResponse(_internal_do_post(request, j, uid, **kwargs)) + task_ids = _internal_do_post(request, j, uid, method, skip_check, override_preciselink) + return celeryResponse(task_ids) if request.method == "DELETE": if not link_id_or_ref: diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py new file mode 100644 index 0000000000..8094e3a81b --- /dev/null +++ b/sefaria/helper/texts/tasks.py @@ -0,0 +1,40 @@ +import structlog +from dataclasses import asdict +import django +django.setup() +from sefaria.sefaria_tasks_interace.history_change import LinkChange +from sefaria.model import * +import sefaria.tracker as tracker +from sefaria.client.wrapper import format_object_for_client +from sefaria.settings import CELERY_QUEUES +from sefaria.celery_setup.app import app +from sefaria.settings import USE_VARNISH +if USE_VARNISH: + from sefaria.system.varnish.wrapper import invalidate_ref + +logger = structlog.get_logger(__name__) + + +@app.task(name="web.save_link") +def save_link(raw_link_change: dict): + link = raw_link_change['raw_link'] + uid = raw_link_change['uid'] + kwargs = {} + if raw_link_change['method'] == 'API': + kwargs['method'] = raw_link_change['method'] + func = tracker.update if "_id" in link else tracker.add + # use the correct function if params indicate this is a note save + obj = func(uid, Link, link, **kwargs) + try: + if USE_VARNISH: + for ref in link.refs: + invalidate_ref(Ref(ref), purge=True) + except Exception as e: + logger.error(e) + return format_object_for_client(obj) + + +def defer_save_link(uid, raw_link: dict, method: str = 'Site'): + link_change = LinkChange(raw_link=raw_link, uid=uid, method=method) + save_signature = save_link.s(asdict(link_change)).set(queue=CELERY_QUEUES['tasks']) + return save_signature.delay() diff --git a/sefaria/sefaria_tasks_interace/history_change.py b/sefaria/sefaria_tasks_interace/history_change.py new file mode 100644 index 0000000000..901a27a9f7 --- /dev/null +++ b/sefaria/sefaria_tasks_interace/history_change.py @@ -0,0 +1,10 @@ +from dataclasses import dataclass + +@dataclass +class AbstractHistoryChange: + uid: int + method: str # ("API" or "Site") + +@dataclass +class LinkChange(AbstractHistoryChange): + raw_link: dict From bd3d965b2c9aba396ebade3d8cf2573470b01d3c Mon Sep 17 00:00:00 2001 From: YishaiGlasner Date: Tue, 23 Jul 2024 14:51:50 +0300 Subject: [PATCH 03/24] refactor(generate topic prompts API): use celeryResponse. --- reader/views.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/reader/views.py b/reader/views.py index 0433e3da8a..079399c6df 100644 --- a/reader/views.py +++ b/reader/views.py @@ -3069,6 +3069,7 @@ def topics_list_api(request): @staff_member_required def generate_topic_prompts_api(request, slug: str): if request.method == "POST": + task_ids = [] from sefaria.helper.llm.tasks import generate_and_save_topic_prompts from sefaria.helper.llm.topic_prompt import get_ref_context_hints_by_lang topic = Topic.init(slug) @@ -3076,8 +3077,8 @@ def generate_topic_prompts_api(request, slug: str): ref_topic_links = post_body.get('ref_topic_links') for lang, ref__context_hints in get_ref_context_hints_by_lang(ref_topic_links).items(): orefs, context_hints = zip(*ref__context_hints) - generate_and_save_topic_prompts(lang, topic, orefs, context_hints) - return jsonResponse({"acknowledged": True}, status=202) + task_ids.append(generate_and_save_topic_prompts(lang, topic, orefs, context_hints).id) + return celeryResponse(task_ids) return jsonResponse({"error": "This API only accepts POST requests."}) From 7af719e373994750e43840e15c0b1a09c308a5a2 Mon Sep 17 00:00:00 2001 From: YishaiGlasner Date: Tue, 30 Jul 2024 16:26:57 +0300 Subject: [PATCH 04/24] refactor(links api): tools for defering to celery only when method is API and only when setting is True: should_run_with_celery that returns bool. defer_to_celery_conditionally - decorator. PossiblyCeleryJSONResponse for retirning the right response . --- reader/views.py | 16 +++++++----- sefaria/helper/texts/tasks.py | 42 +++++++++++++++++++++++++------ sefaria/local_settings_example.py | 1 + 3 files changed, 46 insertions(+), 13 deletions(-) diff --git a/reader/views.py b/reader/views.py index 079399c6df..730b73f8ad 100644 --- a/reader/views.py +++ b/reader/views.py @@ -15,6 +15,7 @@ import os import re import uuid +from dataclasses import asdict from rest_framework.decorators import api_view, permission_classes from rest_framework.permissions import IsAuthenticated @@ -44,6 +45,7 @@ from sefaria.client.wrapper import format_object_for_client, format_note_object_for_client, get_notes, get_links from sefaria.client.util import jsonResponse, celeryResponse from sefaria.history import text_history, get_maximal_collapsed_activity, top_contributors, text_at_revision, record_version_deletion, record_index_deletion +from sefaria.sefaria_tasks_interace.history_change import LinkChange from sefaria.sheets import get_sheets_for_ref, get_sheet_for_panel, annotate_user_links, trending_topics from sefaria.utils.util import text_preview, short_to_long_lang_code, epoch_time from sefaria.utils.hebrew import hebrew_term, has_hebrew @@ -80,7 +82,7 @@ from babel import Locale from sefaria.helper.topic import update_topic, update_topic_titles from sefaria.helper.category import update_order_of_category_children, check_term -from sefaria.helper.texts.tasks import defer_save_link +from sefaria.helper.texts.tasks import save_link, PossiblyCeleryJSONResponse if USE_VARNISH: from sefaria.system.varnish.wrapper import invalidate_ref, invalidate_linked @@ -1923,7 +1925,7 @@ def links_api(request, link_id_or_ref=None): """ def _internal_do_post(request, obj, uid, method, skip_check, override_preciselink): - task_ids = [] + responses = [] if isinstance(obj, dict): obj = [obj] for link in obj: @@ -1931,8 +1933,9 @@ def _internal_do_post(request, obj, uid, method, skip_check, override_preciselin link["_skip_lang_check"] = True if override_preciselink: link["_override_preciselink"] = True - task_ids.append(defer_save_link(uid, link, method).id) - return task_ids + link_change = LinkChange(raw_link=link, uid=uid, method=method) + responses.append(save_link(asdict(link_change))) + return responses def _internal_do_delete(request, link_id_or_ref, uid): obj = tracker.delete(uid, Link, link_id_or_ref, callback=revarnish_link) @@ -1975,8 +1978,9 @@ def _internal_do_delete(request, link_id_or_ref, uid): j = json.loads(j) skip_check = request.GET.get("skip_lang_check", 0) override_preciselink = request.GET.get("override_preciselink", 0) - task_ids = _internal_do_post(request, j, uid, method, skip_check, override_preciselink) - return celeryResponse(task_ids) + responses = _internal_do_post(request, j, uid, method, skip_check, override_preciselink) + response = PossiblyCeleryJSONResponse(responses, method) + return response() if request.method == "DELETE": if not link_id_or_ref: diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index 8094e3a81b..cd2578a17a 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -1,12 +1,16 @@ import structlog from dataclasses import asdict +from functools import wraps import django + +from sefaria.client.util import celeryResponse, jsonResponse + django.setup() from sefaria.sefaria_tasks_interace.history_change import LinkChange from sefaria.model import * import sefaria.tracker as tracker from sefaria.client.wrapper import format_object_for_client -from sefaria.settings import CELERY_QUEUES +from sefaria.settings import CELERY_QUEUES, CELERY_ENABLED from sefaria.celery_setup.app import app from sefaria.settings import USE_VARNISH if USE_VARNISH: @@ -15,6 +19,36 @@ logger = structlog.get_logger(__name__) +def should_run_with_celery(from_api): + return CELERY_ENABLED and from_api + + +class PossiblyCeleryJSONResponse: + def __init__(self, data, method): + self.data = data + self.method = method + + def __call__(self, callback=None, status=200): + if should_run_with_celery(self.method == 'API'): + data = [x.id for x in self.data] + return celeryResponse(data) + return jsonResponse(self.data, status=status, callback=callback) + + +def defer_to_celery_conditionally(queue): + def decorator(func): + @wraps(func) + def wrapper(*args, **kwargs): + if should_run_with_celery(args[0]['method'] == 'API'): + signature = func.s(*args, **kwargs).set(queue=queue) + return signature.delay() + else: + return func(*args, **kwargs) + return wrapper + return decorator + + +@should_defer_to_celery(queue=CELERY_QUEUES['tasks']) @app.task(name="web.save_link") def save_link(raw_link_change: dict): link = raw_link_change['raw_link'] @@ -32,9 +66,3 @@ def save_link(raw_link_change: dict): except Exception as e: logger.error(e) return format_object_for_client(obj) - - -def defer_save_link(uid, raw_link: dict, method: str = 'Site'): - link_change = LinkChange(raw_link=raw_link, uid=uid, method=method) - save_signature = save_link.s(asdict(link_change)).set(queue=CELERY_QUEUES['tasks']) - return save_signature.delay() diff --git a/sefaria/local_settings_example.py b/sefaria/local_settings_example.py index 1d022c2fa9..8e6ec46627 100644 --- a/sefaria/local_settings_example.py +++ b/sefaria/local_settings_example.py @@ -268,6 +268,7 @@ CELERY_REDIS_BROKER_DB_NUM = 2 CELERY_REDIS_RESULT_BACKEND_DB_NUM = 3 CELERY_QUEUES = {} +CELERY_ENABLED = False # END Celery # Key which identifies the Sefaria app as opposed to a user From a89320ed32382e19c344513c04407d0b6786eca4 Mon Sep 17 00:00:00 2001 From: YishaiGlasner Date: Tue, 30 Jul 2024 16:30:50 +0300 Subject: [PATCH 05/24] chore(links api): type hint for celeryResponse. --- sefaria/client/util.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sefaria/client/util.py b/sefaria/client/util.py index 4878cbca8f..66c99b9fcc 100644 --- a/sefaria/client/util.py +++ b/sefaria/client/util.py @@ -39,7 +39,7 @@ def jsonpResponse(data, callback, status=200): return HttpResponse("%s(%s)" % (callback, json.dumps(data, ensure_ascii=False)), content_type="application/javascript; charset=utf-8", charset="utf-8", status=status) -def celeryResponse(task_ids): +def celeryResponse(task_ids: list[str]): return jsonResponse({'task_ids': task_ids}, status=202) def send_email(subject, message_html, from_email, to_email): From 16022d39c6da8dfd51a1fdf075c1eb19e78ccfd3 Mon Sep 17 00:00:00 2001 From: YishaiGlasner Date: Tue, 30 Jul 2024 16:36:38 +0300 Subject: [PATCH 06/24] fix(links api): change name of decorator. --- sefaria/helper/texts/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index cd2578a17a..74afd5cced 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -48,7 +48,7 @@ def wrapper(*args, **kwargs): return decorator -@should_defer_to_celery(queue=CELERY_QUEUES['tasks']) +@defer_to_celery_conditionally(queue=CELERY_QUEUES['tasks']) @app.task(name="web.save_link") def save_link(raw_link_change: dict): link = raw_link_change['raw_link'] From e76a75a30906df8d5af94ecd18d9fed32190ca0c Mon Sep 17 00:00:00 2001 From: YishaiGlasner Date: Thu, 8 Aug 2024 12:56:01 +0300 Subject: [PATCH 07/24] feat(post version API): post version with celery. post method for completely replace all attributes, and patch method for only replace the attributes and text nodes in the request. --- reader/views.py | 54 ++++++++++++++++++- sefaria/helper/texts/tasks.py | 9 ++++ .../sefaria_tasks_interace/history_change.py | 8 +++ sefaria/tracker.py | 49 +++++++++++++++++ sefaria/urls.py | 1 + 5 files changed, 119 insertions(+), 2 deletions(-) diff --git a/reader/views.py b/reader/views.py index 730b73f8ad..dca4d4611e 100644 --- a/reader/views.py +++ b/reader/views.py @@ -45,7 +45,7 @@ from sefaria.client.wrapper import format_object_for_client, format_note_object_for_client, get_notes, get_links from sefaria.client.util import jsonResponse, celeryResponse from sefaria.history import text_history, get_maximal_collapsed_activity, top_contributors, text_at_revision, record_version_deletion, record_index_deletion -from sefaria.sefaria_tasks_interace.history_change import LinkChange +from sefaria.sefaria_tasks_interace.history_change import LinkChange, VersionChange from sefaria.sheets import get_sheets_for_ref, get_sheet_for_panel, annotate_user_links, trending_topics from sefaria.utils.util import text_preview, short_to_long_lang_code, epoch_time from sefaria.utils.hebrew import hebrew_term, has_hebrew @@ -82,7 +82,7 @@ from babel import Locale from sefaria.helper.topic import update_topic, update_topic_titles from sefaria.helper.category import update_order_of_category_children, check_term -from sefaria.helper.texts.tasks import save_link, PossiblyCeleryJSONResponse +from sefaria.helper.texts.tasks import save_link, PossiblyCeleryJSONResponse, save_version if USE_VARNISH: from sefaria.system.varnish.wrapper import invalidate_ref, invalidate_linked @@ -1541,6 +1541,56 @@ def protected_post(request): return jsonResponse({"error": "Unsupported HTTP method."}, callback=request.GET.get("callback", None)) + +@catch_error_as_json +@csrf_exempt +def complete_version_api(request): + + def internal_do_post(): + skip_links = bool(int(request.POST.get("skip_links", 0))) + count_after = int(request.POST.get("count_after", 0)) + version_change = VersionChange(raw_version=data, uid=request.user.id, method=method, patch=patch, count_after=count_after, skip_links=skip_links) + return save_version(asdict(version_change)) + + if request.method == "POST": + patch = False + elif request.method == 'PATCH': + patch = True + else: + return jsonResponse({"error": "Unsupported HTTP method."}, callback=request.GET.get("callback", None)) + + body_unicode = request.body.decode('utf-8') + body_data = urllib.parse.parse_qs(body_unicode) + json_data = body_data.get('json')[0] + if not json_data: + return jsonResponse({"error": "Missing 'json' parameter in post data."}) + data = json.loads(json_data) + + title = data.get('title') + if not title: + return jsonResponse({"error": "Missing title in 'json' parameter."}) + + try: + index = library.get_index(title.replace('_', ' ')) + except BookNameError: + return jsonResponse({"error": f"No index named: {title}"}) + + if not request.user.is_authenticated: + key = body_data.get('apikey')[0] + if not key: + return jsonResponse({"error": "You must be logged in or use an API key to save texts."}) + apikey = db.apikeys.find_one({"key": key}) + method = 'API' + if not apikey: + return jsonResponse({"error": "Unrecognized API key."}) + else: + method = None + internal_do_post = csrf_protect(internal_do_post()) + + response = PossiblyCeleryJSONResponse([internal_do_post()], method) + return response() + + @catch_error_as_json @csrf_exempt def social_image_api(request, tref): diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index 74afd5cced..6974862ac0 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -66,3 +66,12 @@ def save_link(raw_link_change: dict): except Exception as e: logger.error(e) return format_object_for_client(obj) + +@defer_to_celery_conditionally(queue=CELERY_QUEUES['tasks']) +@app.task(name="web.save_version") +def save_version(raw_version_change: dict): + version = raw_version_change['raw_version'] + uid = raw_version_change['uid'] + patch = raw_version_change['patch'] + kwargs = {'skip_links': raw_version_change['skip_links'], 'count_after': raw_version_change['count_after']} + tracker.modify_version(uid, version, patch, **kwargs) diff --git a/sefaria/sefaria_tasks_interace/history_change.py b/sefaria/sefaria_tasks_interace/history_change.py index 901a27a9f7..d1af73218d 100644 --- a/sefaria/sefaria_tasks_interace/history_change.py +++ b/sefaria/sefaria_tasks_interace/history_change.py @@ -8,3 +8,11 @@ class AbstractHistoryChange: @dataclass class LinkChange(AbstractHistoryChange): raw_link: dict + +@dataclass +class VersionChange(AbstractHistoryChange): + raw_version: dict + patch: bool + skip_links: bool + count_after: int + diff --git a/sefaria/tracker.py b/sefaria/tracker.py index 7e68968ea7..e5cd32daf3 100644 --- a/sefaria/tracker.py +++ b/sefaria/tracker.py @@ -3,6 +3,8 @@ Accepts change requests for model objects, passes the changes to the models, and records the changes in history """ +from functools import reduce + import structlog logger = structlog.get_logger(__name__) @@ -91,6 +93,53 @@ def populate_change_map(old_text, en_tref, he_tref, _): return error_map +def modify_version(user: int, version_dict: dict, patch=True, **kwargs): + + def modify_node(jagged_array_node): + address = jagged_array_node.address()[1:] # first element is the index name + new_content = reduce(lambda x, y: x.get(y, {}), address, version_dict['chapter']) + if is_version_new: + old_content = [] + else: + old_content = reduce(lambda x, y: x.setdefault(y, {}), address, version.chapter) or [] + if (patch and new_content == {}) or old_content == new_content: + return + new_content = new_content or [] + if not is_version_new: + if address: + reduce(lambda x, y: x[y], address[:-1], version.chapter)[address[-1]] = new_content + else: + version.chapter = new_content + action = 'add' if not old_content else 'edit' + changing_texts.append({'action': action, 'oref': jagged_array_node.ref(), 'old_text': old_content, 'curr_text': new_content}) + + index_title = version_dict['title'] + lang = version_dict['language'] + version_title = version_dict['versionTitle'] + version = model.Version().load({'title': index_title, 'versionTitle': version_title, 'language': lang}) + changing_texts = [] + if version: + is_version_new = False + if not patch: + for attr in model.Version.required_attrs + model.Version.optional_attrs: + if hasattr(version, attr) and attr != 'chapter': + delattr(version, attr) + for key, value in version_dict.items(): + if key == 'chapter': + continue + else: + setattr(version, key, value) + else: + is_version_new = True + version = model.Version(version_dict) + model.Ref(index_title).index_node.visit_content(modify_node) + version.save() + + for change in changing_texts: + post_modify_text(user, change['action'], change['oref'], lang, version_title, change['old_text'], change['curr_text'], version._id, **kwargs) + count_segments(version.get_index()) + + def post_modify_text(user, action, oref, lang, vtitle, old_text, curr_text, version_id, **kwargs) -> None: model.log_text(user, action, oref, lang, vtitle, old_text, curr_text, **kwargs) if USE_VARNISH: diff --git a/sefaria/urls.py b/sefaria/urls.py index b782760b87..92a4999850 100644 --- a/sefaria/urls.py +++ b/sefaria/urls.py @@ -150,6 +150,7 @@ url(r'^api/texts/modify-bulk/(?P.+)$', reader_views.modify_bulk_text_api), url(r'^api/texts/(?P<tref>.+)/(?P<lang>\w\w)/(?P<version>.+)$', reader_views.old_text_versions_api_redirect), url(r'^api/texts/(?P<tref>.+)$', reader_views.texts_api), + url(r'^api/versions$', reader_views.complete_version_api), url(r'^api/v3/texts/(?P<tref>.+)$', api_views.Text.as_view()), url(r'^api/index/?$', reader_views.table_of_contents_api), url(r'^api/opensearch-suggestions/?$', reader_views.opensearch_suggestions_api), From d75f553d9eeef4961b65d2f8ad4698e3337dc8fe Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Wed, 14 Aug 2024 13:01:10 +0300 Subject: [PATCH 08/24] refactor(move_draft_text): change to use new api/versions --- scripts/move_draft_text.py | 51 ++++++-------------------------------- 1 file changed, 8 insertions(+), 43 deletions(-) diff --git a/scripts/move_draft_text.py b/scripts/move_draft_text.py index ab07284200..d2e599ba91 100644 --- a/scripts/move_draft_text.py +++ b/scripts/move_draft_text.py @@ -85,48 +85,12 @@ def do_copy(self): self.post_terms_from_schema() self._handle_categories() self._make_post_request_to_server(self._prepare_index_api_call(idx_title), idx_contents) - content_nodes = self._index_obj.nodes.get_leaf_nodes() + for ver in self._version_objs: found_non_empty_content = False print(ver.versionTitle.encode('utf-8')) - flags = {} - for flag in ver.optional_attrs: - if hasattr(ver, flag): - flags[flag] = getattr(ver, flag, None) - for node_num, node in enumerate(content_nodes,1): - print(node.full_title(force_update=True)) - text = JaggedTextArray(ver.content_node(node)).array() - version_payload = { - "versionTitle": ver.versionTitle, - "versionSource": ver.versionSource, - "language": ver.language, - "text": text - } - if len(text) > 0: - # only bother posting nodes that have content. - found_non_empty_content = True - if node_num == len(content_nodes): - # try: - self._make_post_request_to_server(self._prepare_text_api_call(node.full_title(force_update=True), count_after=True), version_payload) - # except: - # pass - else: - self._make_post_request_to_server(self._prepare_text_api_call( - node.full_title(force_update=True)), version_payload) - if not found_non_empty_content: - # post the last node again with dummy text, to make sure an actual version db object is created - # then post again to clear the dummy text - dummy_text = "This is a dummy text" - empty = "" - for _ in range(node.depth): - dummy_text = [dummy_text] - empty = [empty] - version_payload['text'] = dummy_text - self._make_post_request_to_server(self._prepare_text_api_call(node.full_title()), version_payload) - version_payload['text'] = empty - self._make_post_request_to_server(self._prepare_text_api_call(node.full_title()), version_payload) - if flags: - self._make_post_request_to_server(self._prepare_version_attrs_api_call(ver.title, ver.language, ver.versionTitle), flags) + self._make_post_request_to_server(self._prepare_text_api_call(), ver, params={'count_after': 1}) + if self._post_links and len(self._linkset) > 0: if self._post_links_step <= 0 or self._post_links_step > len(self._linkset): self._post_links_step = len(self._linkset) @@ -168,8 +132,8 @@ def _upload_term(self, name): def _prepare_index_api_call(self, index_title): return 'api/v2/raw/index/{}'.format(index_title.replace(" ", "_")) - def _prepare_text_api_call(self, terminal_ref, count_after=False): - return 'api/texts/{}?count_after={}&index_after=0'.format(urllib.parse.quote(terminal_ref.replace(" ", "_").encode('utf-8')), int(count_after)) + def _prepare_text_api_call(self): + return 'api/version' def _prepare_version_attrs_api_call(self, title, lang, vtitle): return "api/version/flags/{}/{}/{}".format(urllib.parse.quote(title), urllib.parse.quote(lang), urllib.parse.quote(vtitle)) @@ -177,8 +141,9 @@ def _prepare_version_attrs_api_call(self, title, lang, vtitle): def _prepare_links_api_call(self): return "api/links/" - def _make_post_request_to_server(self, url, payload): - full_url = "{}/{}".format(self._dest_server, url) + def _make_post_request_to_server(self, url, payload, params=None): + params = params or {} + full_url = f"{self._dest_server}/{url}?{urllib.parse.urlencode(params)}" jpayload = json.dumps(payload) values = {'json': jpayload, 'apikey': self._apikey} data = urllib.parse.urlencode(values).encode('utf-8') From b92e8346e6c7b06abeb3a08d8c152ebf52f758ab Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Thu, 15 Aug 2024 10:15:57 +0300 Subject: [PATCH 09/24] chore(celery): new CELERY_ENABLED in local-settings yaml files. --- .../sefaria-project/templates/configmap/local-settings-file.yaml | 1 + .../sefaria-project/templates/configmap/local-settings.yaml | 1 + 2 files changed, 2 insertions(+) diff --git a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml index 771117bc4b..d19c3d3cba 100644 --- a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml +++ b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml @@ -225,6 +225,7 @@ data: SENTINEL_HEADLESS_URL = os.getenv("SENTINEL_HEADLESS_URL") SENTINEL_TRANSPORT_OPTS = json.loads(os.getenv("SENTINEL_TRANSPORT_OPTS", "{}")) SENTINEL_PASSWORD = os.getenv("SENTINEL_PASSWORD") + CELERY_ENABLED = os.getenv("CELERY_ENABLED").lower() == "true" MOBILE_APP_KEY = os.getenv("MOBILE_APP_KEY") diff --git a/helm-chart/sefaria-project/templates/configmap/local-settings.yaml b/helm-chart/sefaria-project/templates/configmap/local-settings.yaml index efab2f1294..2a6255a0d5 100644 --- a/helm-chart/sefaria-project/templates/configmap/local-settings.yaml +++ b/helm-chart/sefaria-project/templates/configmap/local-settings.yaml @@ -38,3 +38,4 @@ data: SENTINEL_HEADLESS_URL: {{ .Values.tasks.redis.sentinelURL }} SENTINEL_TRANSPORT_OPTS: {{ .Values.tasks.redis.transportOptions | toJson | quote }} {{- end }} + CELERY_ENABLED: "{{ .Values.tasks.enabled }}" From 8d079d823a224c80144675d1d18e02742fb378ce Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Wed, 21 Aug 2024 15:39:57 +0300 Subject: [PATCH 10/24] fix(move draft text): use contents for getting dict of Version, --- scripts/move_draft_text.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/scripts/move_draft_text.py b/scripts/move_draft_text.py index d2e599ba91..cc8937956c 100644 --- a/scripts/move_draft_text.py +++ b/scripts/move_draft_text.py @@ -89,7 +89,7 @@ def do_copy(self): for ver in self._version_objs: found_non_empty_content = False print(ver.versionTitle.encode('utf-8')) - self._make_post_request_to_server(self._prepare_text_api_call(), ver, params={'count_after': 1}) + self._make_post_request_to_server(self._prepare_text_api_call(), ver.contents(), params={'count_after': 1}) if self._post_links and len(self._linkset) > 0: if self._post_links_step <= 0 or self._post_links_step > len(self._linkset): From 0debc4244fa20c0c848ae91ca06490a5fc18cc97 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Mon, 2 Sep 2024 12:34:01 +0300 Subject: [PATCH 11/24] feat(post links and version API): run with celery as chord with finally inform function. --- reader/views.py | 20 +++--- sefaria/client/util.py | 7 +- sefaria/helper/texts/tasks.py | 70 +++++++++++-------- .../sefaria_tasks_interace/history_change.py | 1 - 4 files changed, 54 insertions(+), 44 deletions(-) diff --git a/reader/views.py b/reader/views.py index 20d7172805..0764241743 100644 --- a/reader/views.py +++ b/reader/views.py @@ -82,7 +82,7 @@ from babel import Locale from sefaria.helper.topic import update_topic from sefaria.helper.category import update_order_of_category_children, check_term -from sefaria.helper.texts.tasks import save_link, PossiblyCeleryJSONResponse, save_version +from sefaria.helper.texts.tasks import save_version, save_changes, save_link if USE_VARNISH: from sefaria.system.varnish.wrapper import invalidate_ref, invalidate_linked @@ -1550,7 +1550,7 @@ def internal_do_post(): skip_links = bool(int(request.POST.get("skip_links", 0))) count_after = int(request.POST.get("count_after", 0)) version_change = VersionChange(raw_version=data, uid=request.user.id, method=method, patch=patch, count_after=count_after, skip_links=skip_links) - return save_version(asdict(version_change)) + return save_changes([asdict(version_change)], save_version, method) if request.method == "POST": patch = False @@ -1587,8 +1587,7 @@ def internal_do_post(): method = None internal_do_post = csrf_protect(internal_do_post()) - response = PossiblyCeleryJSONResponse([internal_do_post()], method) - return response() + return internal_do_post() @catch_error_as_json @@ -1978,14 +1977,15 @@ def _internal_do_post(request, obj, uid, method, skip_check, override_preciselin responses = [] if isinstance(obj, dict): obj = [obj] - for link in obj: + links = [] + for l, link in enumerate(obj): if skip_check: link["_skip_lang_check"] = True if override_preciselink: link["_override_preciselink"] = True - link_change = LinkChange(raw_link=link, uid=uid, method=method) - responses.append(save_link(asdict(link_change))) - return responses + links.append(asdict(LinkChange(raw_link=link, uid=uid, method=method))) + + return save_changes(links, save_link, method) def _internal_do_delete(request, link_id_or_ref, uid): obj = tracker.delete(uid, Link, link_id_or_ref, callback=revarnish_link) @@ -2028,9 +2028,7 @@ def _internal_do_delete(request, link_id_or_ref, uid): j = json.loads(j) skip_check = request.GET.get("skip_lang_check", 0) override_preciselink = request.GET.get("override_preciselink", 0) - responses = _internal_do_post(request, j, uid, method, skip_check, override_preciselink) - response = PossiblyCeleryJSONResponse(responses, method) - return response() + return _internal_do_post(request, j, uid, method, skip_check, override_preciselink) if request.method == "DELETE": if not link_id_or_ref: diff --git a/sefaria/client/util.py b/sefaria/client/util.py index 66c99b9fcc..2119f88ba0 100644 --- a/sefaria/client/util.py +++ b/sefaria/client/util.py @@ -39,8 +39,11 @@ def jsonpResponse(data, callback, status=200): return HttpResponse("%s(%s)" % (callback, json.dumps(data, ensure_ascii=False)), content_type="application/javascript; charset=utf-8", charset="utf-8", status=status) -def celeryResponse(task_ids: list[str]): - return jsonResponse({'task_ids': task_ids}, status=202) +def celeryResponse(task_id: str, sub_task_ids: list[str] = None): + data = {'task_id': task_id} + if sub_task_ids: + data['sub_task_ids'] = sub_task_ids + return jsonResponse(data, status=202) def send_email(subject, message_html, from_email, to_email): msg = EmailMultiAlternatives(subject, message_html, "Sefaria <hello@sefaria.org>", [to_email], reply_to=[from_email]) diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index 6974862ac0..4f53685ec4 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -1,12 +1,14 @@ +import traceback +import uuid import structlog -from dataclasses import asdict from functools import wraps import django - +from celery import chord +from collections import Counter from sefaria.client.util import celeryResponse, jsonResponse +from sefaria.system.exceptions import DuplicateRecordError django.setup() -from sefaria.sefaria_tasks_interace.history_change import LinkChange from sefaria.model import * import sefaria.tracker as tracker from sefaria.client.wrapper import format_object_for_client @@ -22,34 +24,44 @@ def should_run_with_celery(from_api): return CELERY_ENABLED and from_api - -class PossiblyCeleryJSONResponse: - def __init__(self, data, method): - self.data = data - self.method = method - - def __call__(self, callback=None, status=200): - if should_run_with_celery(self.method == 'API'): - data = [x.id for x in self.data] - return celeryResponse(data) - return jsonResponse(self.data, status=status, callback=callback) - - -def defer_to_celery_conditionally(queue): - def decorator(func): - @wraps(func) - def wrapper(*args, **kwargs): - if should_run_with_celery(args[0]['method'] == 'API'): - signature = func.s(*args, **kwargs).set(queue=queue) - return signature.delay() +def save_changes(changes, func, method): + if should_run_with_celery(method == 'API'): + main_task_id = str(uuid.uuid4()) + tasks = [save_change.s(func.__name__, c).set(queue=CELERY_QUEUES['tasks']) for c in changes] + job = chord(tasks, inform.s(main_task_id=main_task_id).set(queue=CELERY_QUEUES['tasks']))(task_id=main_task_id) + tasks_ids = [task.id for task in job.parent.results] + return celeryResponse(job.id, tasks_ids) + else: + results = [] + for change in changes: + try: + func(change) + except Exception as e: + results.append({'error': f'Object: {change}. Error: {e}'}) else: - return func(*args, **kwargs) - return wrapper - return decorator + results.append({'status': 'ok'}) + return jsonResponse(results) + +@app.task(name="web.save_change") +def save_change(func_name, raw_history_change): + function_names = {'save_link': save_link, 'save_version': save_version} + func = function_names[func_name] + try: + func(raw_history_change) + return 'Success' + except Exception as e: + logger.error(f'''Error: + change: {raw_history_change} + {traceback.format_exc()}''') + if isinstance(e, DuplicateRecordError): + return 'DuplicateRecordError' + else: + return repr(e) +@app.task(name="web.inform") +def inform(results, main_task_id): + print(Counter(results)) -@defer_to_celery_conditionally(queue=CELERY_QUEUES['tasks']) -@app.task(name="web.save_link") def save_link(raw_link_change: dict): link = raw_link_change['raw_link'] uid = raw_link_change['uid'] @@ -67,8 +79,6 @@ def save_link(raw_link_change: dict): logger.error(e) return format_object_for_client(obj) -@defer_to_celery_conditionally(queue=CELERY_QUEUES['tasks']) -@app.task(name="web.save_version") def save_version(raw_version_change: dict): version = raw_version_change['raw_version'] uid = raw_version_change['uid'] diff --git a/sefaria/sefaria_tasks_interace/history_change.py b/sefaria/sefaria_tasks_interace/history_change.py index d1af73218d..3c91df8dcf 100644 --- a/sefaria/sefaria_tasks_interace/history_change.py +++ b/sefaria/sefaria_tasks_interace/history_change.py @@ -15,4 +15,3 @@ class VersionChange(AbstractHistoryChange): patch: bool skip_links: bool count_after: int - From 4e15bda3272bcd8733a9b5d48792c7789daa4ff7 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Mon, 2 Sep 2024 13:14:36 +0300 Subject: [PATCH 12/24] feat(post links and version API): send message to slack. --- requirements.txt | 1 + sefaria/helper/slack/__init__.py | 0 sefaria/helper/slack/send_message.py | 20 ++++++++++++++++++++ sefaria/helper/texts/tasks.py | 7 +++++-- 4 files changed, 26 insertions(+), 2 deletions(-) create mode 100644 sefaria/helper/slack/__init__.py create mode 100644 sefaria/helper/slack/send_message.py diff --git a/requirements.txt b/requirements.txt index c1219d9635..ca2b3c6f82 100644 --- a/requirements.txt +++ b/requirements.txt @@ -63,6 +63,7 @@ requests roman==3.3 selenium==3.141.0 sentry-sdk==1.26.0 +slack_sdk==3.31.0 tqdm==4.51.0 ua-parser==0.10.0 undecorated==0.3.0 diff --git a/sefaria/helper/slack/__init__.py b/sefaria/helper/slack/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/sefaria/helper/slack/send_message.py b/sefaria/helper/slack/send_message.py new file mode 100644 index 0000000000..8622b4565d --- /dev/null +++ b/sefaria/helper/slack/send_message.py @@ -0,0 +1,20 @@ +from slack_sdk import WebClient +from slack_sdk.errors import SlackApiError +import structlog +try: + from sefaria.settings import TEXT_UPLOAD_SLACK_TOKEN + client = WebClient(token=TEXT_UPLOAD_SLACK_TOKEN) +except ImportError: + client = None + +logger = structlog.get_logger(__name__) + + +def send_message(text, channel='#engineering-signal'): + try: + if client is not None: + client.chat_postMessage(channel=channel, text=text) + else: + logger.info(text) + except SlackApiError as e: + logger.error(f"Error sending message: {e.response['error']}") diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index 4f53685ec4..c5f6540923 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -1,7 +1,6 @@ import traceback import uuid import structlog -from functools import wraps import django from celery import chord from collections import Counter @@ -15,6 +14,7 @@ from sefaria.settings import CELERY_QUEUES, CELERY_ENABLED from sefaria.celery_setup.app import app from sefaria.settings import USE_VARNISH +from sefaria.helper.slack.send_message import send_message if USE_VARNISH: from sefaria.system.varnish.wrapper import invalidate_ref @@ -60,7 +60,10 @@ def save_change(func_name, raw_history_change): @app.task(name="web.inform") def inform(results, main_task_id): - print(Counter(results)) + title = f'Results for celery main task with id {main_task_id}' + results = '\n'.join([f'{k}: {v}.' for k, v in Counter(results).items()]) + message = f'{title}\n{results}' + send_message(message) def save_link(raw_link_change: dict): link = raw_link_change['raw_link'] From cf31df5116ba4194ae702ee53c7ecb17aa35819f Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Tue, 3 Sep 2024 10:52:13 +0300 Subject: [PATCH 13/24] fix(celery): generate url when password is None. --- sefaria/celery_setup/generate_config.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sefaria/celery_setup/generate_config.py b/sefaria/celery_setup/generate_config.py index e297b99e69..876249b834 100644 --- a/sefaria/celery_setup/generate_config.py +++ b/sefaria/celery_setup/generate_config.py @@ -37,7 +37,7 @@ def add_db_num_to_url(url, port, db_num): def add_password_to_url(url, password): - if len(password) == 0: + if not password: return url return re.sub(r'((?:redis|sentinel)://)', fr'\1:{password}@', url) From 508bbb838e4f91630648a8869580c5b5a54830e2 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Tue, 3 Sep 2024 13:22:22 +0300 Subject: [PATCH 14/24] fix(version API): add /? to endpoint. --- sefaria/urls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sefaria/urls.py b/sefaria/urls.py index a23bdd957e..e1def00c57 100644 --- a/sefaria/urls.py +++ b/sefaria/urls.py @@ -150,7 +150,7 @@ url(r'^api/texts/modify-bulk/(?P<title>.+)$', reader_views.modify_bulk_text_api), url(r'^api/texts/(?P<tref>.+)/(?P<lang>\w\w)/(?P<version>.+)$', reader_views.old_text_versions_api_redirect), url(r'^api/texts/(?P<tref>.+)$', reader_views.texts_api), - url(r'^api/versions$', reader_views.complete_version_api), + url(r'^api/versions/?$', reader_views.complete_version_api), url(r'^api/v3/texts/(?P<tref>.+)$', api_views.Text.as_view()), url(r'^api/index/?$', reader_views.table_of_contents_api), url(r'^api/opensearch-suggestions/?$', reader_views.opensearch_suggestions_api), From b6f5778e09cd633a91d285b799e1a0f7e32cf75d Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Tue, 3 Sep 2024 13:22:49 +0300 Subject: [PATCH 15/24] fix(version API): fix url typo. --- scripts/move_draft_text.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/scripts/move_draft_text.py b/scripts/move_draft_text.py index cc8937956c..6b59098ded 100644 --- a/scripts/move_draft_text.py +++ b/scripts/move_draft_text.py @@ -133,10 +133,7 @@ def _prepare_index_api_call(self, index_title): return 'api/v2/raw/index/{}'.format(index_title.replace(" ", "_")) def _prepare_text_api_call(self): - return 'api/version' - - def _prepare_version_attrs_api_call(self, title, lang, vtitle): - return "api/version/flags/{}/{}/{}".format(urllib.parse.quote(title), urllib.parse.quote(lang), urllib.parse.quote(vtitle)) + return 'api/versions/' def _prepare_links_api_call(self): return "api/links/" @@ -147,6 +144,7 @@ def _make_post_request_to_server(self, url, payload, params=None): jpayload = json.dumps(payload) values = {'json': jpayload, 'apikey': self._apikey} data = urllib.parse.urlencode(values).encode('utf-8') + print(111, full_url) req = urllib.request.Request(full_url, data) try: response = urllib.request.urlopen(req) @@ -169,7 +167,7 @@ def _make_post_request_to_server(self, url, payload, params=None): parser.add_argument("-v", "--versionlist", help="pipe separated version list: lang:versionTitle. To copy all versions, simply input 'all'") parser.add_argument("-k", "--apikey", help="non default api key", default=SEFARIA_BOT_API_KEY) parser.add_argument("-d", "--destination_server", help="override destination server", default='http://eph.sefaria.org') - parser.add_argument("-l", "--links", default=0, type=int, help="Enter '1' to move manual links on this text as well, '2' to move auto links") + parser.add_argument("-l", "--links", default=0, type=int, help="Enter '1' to move only manual links, '2' to move auto links on this text as well") parser.add_argument("-s", "--step", default=-1, type=int, help="Enter step size for link posting. Size of 400 means links are posted 400 at a time.") args = parser.parse_args() print(args) From fe53430dd93d3431c110cf3cf4b5a8f3a1bede13 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Tue, 3 Sep 2024 14:13:37 +0300 Subject: [PATCH 16/24] chore(move draft text): remove print. --- scripts/move_draft_text.py | 1 - 1 file changed, 1 deletion(-) diff --git a/scripts/move_draft_text.py b/scripts/move_draft_text.py index 6b59098ded..3425c975d3 100644 --- a/scripts/move_draft_text.py +++ b/scripts/move_draft_text.py @@ -144,7 +144,6 @@ def _make_post_request_to_server(self, url, payload, params=None): jpayload = json.dumps(payload) values = {'json': jpayload, 'apikey': self._apikey} data = urllib.parse.urlencode(values).encode('utf-8') - print(111, full_url) req = urllib.request.Request(full_url, data) try: response = urllib.request.urlopen(req) From a3d0a94de50bca5e6bfac9847cd1c3b06bf7e8de Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Tue, 3 Sep 2024 14:51:22 +0300 Subject: [PATCH 17/24] refactor(slack): send message to slack in the end of celery task via API. --- requirements.txt | 1 - sefaria/helper/slack/send_message.py | 34 ++++++++++++++-------------- sefaria/helper/texts/tasks.py | 3 +-- sefaria/local_settings_example.py | 3 +++ 4 files changed, 21 insertions(+), 20 deletions(-) diff --git a/requirements.txt b/requirements.txt index ca2b3c6f82..c1219d9635 100644 --- a/requirements.txt +++ b/requirements.txt @@ -63,7 +63,6 @@ requests roman==3.3 selenium==3.141.0 sentry-sdk==1.26.0 -slack_sdk==3.31.0 tqdm==4.51.0 ua-parser==0.10.0 undecorated==0.3.0 diff --git a/sefaria/helper/slack/send_message.py b/sefaria/helper/slack/send_message.py index 8622b4565d..e60a0af0ed 100644 --- a/sefaria/helper/slack/send_message.py +++ b/sefaria/helper/slack/send_message.py @@ -1,20 +1,20 @@ -from slack_sdk import WebClient -from slack_sdk.errors import SlackApiError -import structlog -try: - from sefaria.settings import TEXT_UPLOAD_SLACK_TOKEN - client = WebClient(token=TEXT_UPLOAD_SLACK_TOKEN) -except ImportError: - client = None +import requests +from sefaria.settings import SLACK_URL -logger = structlog.get_logger(__name__) +def send_message(channel, username, pretext, text, fallback=None, icon_emoji=':robot_face:', color="#a30200"): + post_object = { + "icon_emoji": icon_emoji, + "username": username, + "channel": channel, + "attachments": [ + { + "fallback": fallback or pretext, + "color": color, + "pretext": pretext, + "text": text + } + ] + } -def send_message(text, channel='#engineering-signal'): - try: - if client is not None: - client.chat_postMessage(channel=channel, text=text) - else: - logger.info(text) - except SlackApiError as e: - logger.error(f"Error sending message: {e.response['error']}") + requests.post(SLACK_URL, json=post_object) diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index c5f6540923..06aa7a6e1a 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -62,8 +62,7 @@ def save_change(func_name, raw_history_change): def inform(results, main_task_id): title = f'Results for celery main task with id {main_task_id}' results = '\n'.join([f'{k}: {v}.' for k, v in Counter(results).items()]) - message = f'{title}\n{results}' - send_message(message) + send_message('#engineering-signal', 'Text Upload', title, results, icon_emoji=':leafy_green:') def save_link(raw_link_change: dict): link = raw_link_change['raw_link'] diff --git a/sefaria/local_settings_example.py b/sefaria/local_settings_example.py index 883a9faf3f..7240dc4392 100644 --- a/sefaria/local_settings_example.py +++ b/sefaria/local_settings_example.py @@ -273,6 +273,9 @@ CELERY_ENABLED = False # END Celery +#Slack +SLACK_URL = '' + # Key which identifies the Sefaria app as opposed to a user # using our API outside of the app. Mainly for registration MOBILE_APP_KEY = "MOBILE_APP_KEY" From 8fc00dfbf1a0a2be52a74ead2f672e998aebb3c6 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Tue, 3 Sep 2024 17:28:30 +0300 Subject: [PATCH 18/24] chore(slack): add SLACK_URL to yaml. --- .../templates/configmap/local-settings-file.yaml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml index 8a2876039d..7b512757cd 100644 --- a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml +++ b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml @@ -227,6 +227,8 @@ data: SENTINEL_TRANSPORT_OPTS = json.loads(os.getenv("SENTINEL_TRANSPORT_OPTS", "{}")) SENTINEL_PASSWORD = os.getenv("SENTINEL_PASSWORD") CELERY_ENABLED = os.getenv("CELERY_ENABLED").lower() == "true" + + SLACK_URL = os.getenv("SLACK_URL") MOBILE_APP_KEY = os.getenv("MOBILE_APP_KEY") From eee43721ebfe1050df740053b781f22ee60fe406 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Wed, 4 Sep 2024 11:50:18 +0300 Subject: [PATCH 19/24] chore(celery): add varnish secret. --- helm-chart/sefaria-project/templates/rollout/task.yaml | 6 ++++++ helm-chart/sefaria-project/templates/rollout/web.yaml | 5 +++++ 2 files changed, 11 insertions(+) diff --git a/helm-chart/sefaria-project/templates/rollout/task.yaml b/helm-chart/sefaria-project/templates/rollout/task.yaml index 4169f418af..6c8e94e64f 100644 --- a/helm-chart/sefaria-project/templates/rollout/task.yaml +++ b/helm-chart/sefaria-project/templates/rollout/task.yaml @@ -118,6 +118,9 @@ spec: - name: elastic-cert mountPath: /etc/ssl/certs/elastic readOnly: true + - mountPath: /varnish-secret + name: varnish-secret + readOnly: true volumes: - name: local-settings configMap: @@ -129,6 +132,9 @@ spec: secret: secretName: {{ template "sefaria.secrets.elasticCertificate" . }} optional: true + - name: varnish-secret + secret: + secretName: {{ template "sefaria.secrets.varnish" . }} - name: client-secret secret: secretName: {{ template "sefaria.secrets.googleClient" . }} # needs to be checked if it's a reference object or the data object we created. diff --git a/helm-chart/sefaria-project/templates/rollout/web.yaml b/helm-chart/sefaria-project/templates/rollout/web.yaml index 1264c78ba5..57c6668d6f 100644 --- a/helm-chart/sefaria-project/templates/rollout/web.yaml +++ b/helm-chart/sefaria-project/templates/rollout/web.yaml @@ -116,6 +116,11 @@ spec: fieldPath: spec.nodeName - name: OTEL_RESOURCE_ATTRIBUTES value: k8s.container.name=app,k8s.deployment.name={{ .Values.deployEnv }}-web,k8s.namespace.name={{ .Release.Namespace }},k8s.node.name=$(OTEL_RESOURCE_ATTRIBUTES_NODE_NAME),k8s.pod.name=$(OTEL_RESOURCE_ATTRIBUTES_POD_NAME) + - name: SLACK_URL + valueFrom: + secretKeyRef: + name: { { template "sefaria.secrets.slackWebhook" . } } + key: slack-webhook {{- end }} envFrom: {{- if .Values.tasks.enabled }} From 33084a0fb26c98dc3458ddcff6e354afb9f9cd14 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Wed, 4 Sep 2024 13:57:21 +0300 Subject: [PATCH 20/24] feat(celery): limit workers and tasks to test text upload pod overload. --- .../templates/configmap/local-settings-file.yaml | 2 ++ sefaria/celery_setup/config.py | 7 ++++--- sefaria/celery_setup/generate_config.py | 3 ++- sefaria/local_settings_example.py | 2 ++ 4 files changed, 10 insertions(+), 4 deletions(-) diff --git a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml index 7b512757cd..b02e5e2b39 100644 --- a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml +++ b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml @@ -222,6 +222,8 @@ data: REDIS_PASSWORD = os.getenv("REDIS_PASSWORD") CELERY_REDIS_BROKER_DB_NUM = os.getenv("CELERY_REDIS_BROKER_DB_NUM") CELERY_REDIS_RESULT_BACKEND_DB_NUM = os.getenv("CELERY_REDIS_RESULT_BACKEND_DB_NUM") + CELERY_WORKER_CONCURRENCY = '5' + WORKER_MAX_TASKS_PER_CHILD = '50' CELERY_QUEUES = json.loads(os.getenv("CELERY_QUEUES", "{}")) SENTINEL_HEADLESS_URL = os.getenv("SENTINEL_HEADLESS_URL") SENTINEL_TRANSPORT_OPTS = json.loads(os.getenv("SENTINEL_TRANSPORT_OPTS", "{}")) diff --git a/sefaria/celery_setup/config.py b/sefaria/celery_setup/config.py index 3d93d98ce4..f5e921f92d 100644 --- a/sefaria/celery_setup/config.py +++ b/sefaria/celery_setup/config.py @@ -1,12 +1,13 @@ from sefaria.settings import (REDIS_URL, REDIS_PASSWORD, REDIS_PORT, CELERY_REDIS_BROKER_DB_NUM, - CELERY_REDIS_RESULT_BACKEND_DB_NUM, SENTINEL_HEADLESS_URL, SENTINEL_PASSWORD, - SENTINEL_TRANSPORT_OPTS) + CELERY_REDIS_RESULT_BACKEND_DB_NUM, CELERY_WORKER_CONCURRENCY, WORKER_MAX_TASKS_PER_CHILD, + SENTINEL_HEADLESS_URL, SENTINEL_PASSWORD, SENTINEL_TRANSPORT_OPTS) from sefaria.celery_setup.generate_config import generate_config, SentinelConfig, RedisConfig def generate_config_from_env(): return generate_config( - RedisConfig(REDIS_URL, REDIS_PASSWORD, REDIS_PORT, CELERY_REDIS_BROKER_DB_NUM, CELERY_REDIS_RESULT_BACKEND_DB_NUM), + RedisConfig(REDIS_URL, REDIS_PASSWORD, REDIS_PORT, CELERY_REDIS_BROKER_DB_NUM, + CELERY_REDIS_RESULT_BACKEND_DB_NUM, CELERY_WORKER_CONCURRENCY, WORKER_MAX_TASKS_PER_CHILD), SentinelConfig(SENTINEL_HEADLESS_URL, SENTINEL_PASSWORD, REDIS_PORT, SENTINEL_TRANSPORT_OPTS) ) diff --git a/sefaria/celery_setup/generate_config.py b/sefaria/celery_setup/generate_config.py index 876249b834..2093e0a9a0 100644 --- a/sefaria/celery_setup/generate_config.py +++ b/sefaria/celery_setup/generate_config.py @@ -30,7 +30,8 @@ class RedisConfig: port: str broker_db_num: str result_backend_db_num: str - + worker_concurrency: str + worker_max_tasks_per_child: str def add_db_num_to_url(url, port, db_num): return url.replace(f':{port}', f':{port}/{db_num}') diff --git a/sefaria/local_settings_example.py b/sefaria/local_settings_example.py index 7240dc4392..f7be8561c4 100644 --- a/sefaria/local_settings_example.py +++ b/sefaria/local_settings_example.py @@ -269,6 +269,8 @@ SENTINEL_TRANSPORT_OPTS = {} CELERY_REDIS_BROKER_DB_NUM = 2 CELERY_REDIS_RESULT_BACKEND_DB_NUM = 3 +CELERY_WORKER_CONCURRENCY = '5' +WORKER_MAX_TASKS_PER_CHILD = '50' CELERY_QUEUES = {} CELERY_ENABLED = False # END Celery From 5235bcf056fc5ed197495c8bae6803792fad9d99 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Wed, 4 Sep 2024 15:55:07 +0300 Subject: [PATCH 21/24] feat(celery): revert. --- .../templates/configmap/local-settings-file.yaml | 2 -- sefaria/celery_setup/config.py | 7 +++---- sefaria/celery_setup/generate_config.py | 3 +-- sefaria/local_settings_example.py | 2 -- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml index b02e5e2b39..7b512757cd 100644 --- a/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml +++ b/helm-chart/sefaria-project/templates/configmap/local-settings-file.yaml @@ -222,8 +222,6 @@ data: REDIS_PASSWORD = os.getenv("REDIS_PASSWORD") CELERY_REDIS_BROKER_DB_NUM = os.getenv("CELERY_REDIS_BROKER_DB_NUM") CELERY_REDIS_RESULT_BACKEND_DB_NUM = os.getenv("CELERY_REDIS_RESULT_BACKEND_DB_NUM") - CELERY_WORKER_CONCURRENCY = '5' - WORKER_MAX_TASKS_PER_CHILD = '50' CELERY_QUEUES = json.loads(os.getenv("CELERY_QUEUES", "{}")) SENTINEL_HEADLESS_URL = os.getenv("SENTINEL_HEADLESS_URL") SENTINEL_TRANSPORT_OPTS = json.loads(os.getenv("SENTINEL_TRANSPORT_OPTS", "{}")) diff --git a/sefaria/celery_setup/config.py b/sefaria/celery_setup/config.py index f5e921f92d..3d93d98ce4 100644 --- a/sefaria/celery_setup/config.py +++ b/sefaria/celery_setup/config.py @@ -1,13 +1,12 @@ from sefaria.settings import (REDIS_URL, REDIS_PASSWORD, REDIS_PORT, CELERY_REDIS_BROKER_DB_NUM, - CELERY_REDIS_RESULT_BACKEND_DB_NUM, CELERY_WORKER_CONCURRENCY, WORKER_MAX_TASKS_PER_CHILD, - SENTINEL_HEADLESS_URL, SENTINEL_PASSWORD, SENTINEL_TRANSPORT_OPTS) + CELERY_REDIS_RESULT_BACKEND_DB_NUM, SENTINEL_HEADLESS_URL, SENTINEL_PASSWORD, + SENTINEL_TRANSPORT_OPTS) from sefaria.celery_setup.generate_config import generate_config, SentinelConfig, RedisConfig def generate_config_from_env(): return generate_config( - RedisConfig(REDIS_URL, REDIS_PASSWORD, REDIS_PORT, CELERY_REDIS_BROKER_DB_NUM, - CELERY_REDIS_RESULT_BACKEND_DB_NUM, CELERY_WORKER_CONCURRENCY, WORKER_MAX_TASKS_PER_CHILD), + RedisConfig(REDIS_URL, REDIS_PASSWORD, REDIS_PORT, CELERY_REDIS_BROKER_DB_NUM, CELERY_REDIS_RESULT_BACKEND_DB_NUM), SentinelConfig(SENTINEL_HEADLESS_URL, SENTINEL_PASSWORD, REDIS_PORT, SENTINEL_TRANSPORT_OPTS) ) diff --git a/sefaria/celery_setup/generate_config.py b/sefaria/celery_setup/generate_config.py index 2093e0a9a0..876249b834 100644 --- a/sefaria/celery_setup/generate_config.py +++ b/sefaria/celery_setup/generate_config.py @@ -30,8 +30,7 @@ class RedisConfig: port: str broker_db_num: str result_backend_db_num: str - worker_concurrency: str - worker_max_tasks_per_child: str + def add_db_num_to_url(url, port, db_num): return url.replace(f':{port}', f':{port}/{db_num}') diff --git a/sefaria/local_settings_example.py b/sefaria/local_settings_example.py index f7be8561c4..7240dc4392 100644 --- a/sefaria/local_settings_example.py +++ b/sefaria/local_settings_example.py @@ -269,8 +269,6 @@ SENTINEL_TRANSPORT_OPTS = {} CELERY_REDIS_BROKER_DB_NUM = 2 CELERY_REDIS_RESULT_BACKEND_DB_NUM = 3 -CELERY_WORKER_CONCURRENCY = '5' -WORKER_MAX_TASKS_PER_CHILD = '50' CELERY_QUEUES = {} CELERY_ENABLED = False # END Celery From f543ba823f04e012aa33323c32d8416c3b629005 Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Wed, 4 Sep 2024 15:57:04 +0300 Subject: [PATCH 22/24] feat(celery): add acks_late. --- sefaria/helper/texts/tasks.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index 06aa7a6e1a..3b769dca11 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -42,7 +42,7 @@ def save_changes(changes, func, method): results.append({'status': 'ok'}) return jsonResponse(results) -@app.task(name="web.save_change") +@app.task(name="web.save_change", acks_late=True) def save_change(func_name, raw_history_change): function_names = {'save_link': save_link, 'save_version': save_version} func = function_names[func_name] @@ -58,7 +58,7 @@ def save_change(func_name, raw_history_change): else: return repr(e) -@app.task(name="web.inform") +@app.task(name="web.inform", acks_late=True) def inform(results, main_task_id): title = f'Results for celery main task with id {main_task_id}' results = '\n'.join([f'{k}: {v}.' for k, v in Counter(results).items()]) From 2c7baba90552e6d5f0f0a28b32d2fa3d50f8af8f Mon Sep 17 00:00:00 2001 From: Akiva Berger <akiva10b@gmail.com> Date: Thu, 5 Sep 2024 12:43:53 +0300 Subject: [PATCH 23/24] chore(text upload): ignore results of celery task Celery tasks results are important when not controlled for in logs or in a notification system likes slack, in our case we are controlling for the results, thus storing them simply adds latency to the process --- sefaria/helper/texts/tasks.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index 3b769dca11..f114b70cb5 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -42,7 +42,7 @@ def save_changes(changes, func, method): results.append({'status': 'ok'}) return jsonResponse(results) -@app.task(name="web.save_change", acks_late=True) +@app.task(name="web.save_change", acks_late=True, ignore_result=True) def save_change(func_name, raw_history_change): function_names = {'save_link': save_link, 'save_version': save_version} func = function_names[func_name] From f999791040c6511aad40c1544221f03f6546492e Mon Sep 17 00:00:00 2001 From: YishaiGlasner <ishaigla@gmail.com> Date: Mon, 9 Sep 2024 15:06:06 +0300 Subject: [PATCH 24/24] feat(post links and version: add some identifying data to celery slack message/ --- reader/views.py | 6 ++++-- sefaria/helper/texts/tasks.py | 8 ++++---- 2 files changed, 8 insertions(+), 6 deletions(-) diff --git a/reader/views.py b/reader/views.py index 0764241743..2e5e314cb5 100644 --- a/reader/views.py +++ b/reader/views.py @@ -1550,7 +1550,8 @@ def internal_do_post(): skip_links = bool(int(request.POST.get("skip_links", 0))) count_after = int(request.POST.get("count_after", 0)) version_change = VersionChange(raw_version=data, uid=request.user.id, method=method, patch=patch, count_after=count_after, skip_links=skip_links) - return save_changes([asdict(version_change)], save_version, method) + task_title = f'Version Post: {data["title"]} / {data["versionTitle"]} / {data["language"]}' + return save_changes([asdict(version_change)], save_version, method, task_title) if request.method == "POST": patch = False @@ -1985,7 +1986,8 @@ def _internal_do_post(request, obj, uid, method, skip_check, override_preciselin link["_override_preciselink"] = True links.append(asdict(LinkChange(raw_link=link, uid=uid, method=method))) - return save_changes(links, save_link, method) + task_title = f'Links Post. First link: {obj[0]["refs"][0]}-{obj[0]["refs"][1]}' + return save_changes(links, save_link, method, task_title) def _internal_do_delete(request, link_id_or_ref, uid): obj = tracker.delete(uid, Link, link_id_or_ref, callback=revarnish_link) diff --git a/sefaria/helper/texts/tasks.py b/sefaria/helper/texts/tasks.py index f114b70cb5..b5696d8a1c 100644 --- a/sefaria/helper/texts/tasks.py +++ b/sefaria/helper/texts/tasks.py @@ -24,11 +24,11 @@ def should_run_with_celery(from_api): return CELERY_ENABLED and from_api -def save_changes(changes, func, method): +def save_changes(changes, func, method, task_title=''): if should_run_with_celery(method == 'API'): main_task_id = str(uuid.uuid4()) tasks = [save_change.s(func.__name__, c).set(queue=CELERY_QUEUES['tasks']) for c in changes] - job = chord(tasks, inform.s(main_task_id=main_task_id).set(queue=CELERY_QUEUES['tasks']))(task_id=main_task_id) + job = chord(tasks, inform.s(main_task_id=main_task_id, task_title=task_title).set(queue=CELERY_QUEUES['tasks']))(task_id=main_task_id) tasks_ids = [task.id for task in job.parent.results] return celeryResponse(job.id, tasks_ids) else: @@ -59,8 +59,8 @@ def save_change(func_name, raw_history_change): return repr(e) @app.task(name="web.inform", acks_late=True) -def inform(results, main_task_id): - title = f'Results for celery main task with id {main_task_id}' +def inform(results, main_task_id, task_title): + title = f'{task_title} (celery main task id {main_task_id})' results = '\n'.join([f'{k}: {v}.' for k, v in Counter(results).items()]) send_message('#engineering-signal', 'Text Upload', title, results, icon_emoji=':leafy_green:')