Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add JwtRestrictedApplication check to XBlock callback #794

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions lms/djangoapps/courseware/module_render.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from edx_proctoring.api import get_attempt_status_summary
from edx_proctoring.services import ProctoringService
from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication
from edx_rest_framework_extensions.permissions import JwtRestrictedApplication
from edx_when.field_data import DateLookupFieldData
from eventtracking import tracker
from opaque_keys import InvalidKeyError
Expand Down Expand Up @@ -982,12 +983,19 @@ def handle_xblock_callback(request, course_id, usage_id, handler, suffix=None):
)
else:
if user_auth_tuple is not None:
request.user, _ = user_auth_tuple
# When using JWT authentication, the second element contains the JWT token. We need it to determine
# whether the application that issued the token is restricted.
request.user, request.auth = user_auth_tuple
# This is verified by the `JwtRestrictedApplication` before it decodes the token.
request.successful_authenticator = authenticator
break

# NOTE (CCB): Allow anonymous GET calls (e.g. for transcripts). Modifying this view is simpler than updating
# the XBlocks to use `handle_xblock_callback_noauth`, which is practically identical to this view.
if request.method != 'GET' and not (request.user and request.user.is_authenticated):
# Block all request types coming from restricted applications.
if (
request.method != 'GET' and not (request.user and request.user.is_authenticated)
) or JwtRestrictedApplication().has_permission(request, None): # type: ignore
return HttpResponseForbidden('Unauthenticated')

request.user.known = request.user.is_authenticated
Expand Down
22 changes: 21 additions & 1 deletion lms/djangoapps/courseware/tests/test_module_render.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@
from lms.djangoapps.lms_xblock.field_data import LmsFieldData
from openedx.core.djangoapps.credit.api import set_credit_requirement_status, set_credit_requirements
from openedx.core.djangoapps.credit.models import CreditCourse
from openedx.core.djangoapps.oauth_dispatch.jwt import create_jwt_for_user
from openedx.core.djangoapps.oauth_dispatch.jwt import _create_jwt, create_jwt_for_user
from openedx.core.djangoapps.oauth_dispatch.tests.factories import AccessTokenFactory, ApplicationFactory
from openedx.core.lib.courses import course_image_url
from openedx.core.lib.gating import api as gating_api
Expand Down Expand Up @@ -363,6 +363,26 @@ def test_jwt_authentication(self):
response = self.client.post(dispatch_url, {}, **headers)
assert 200 == response.status_code

def test_jwt_authentication_with_restricted_application(self):
"""Test that the XBlock endpoint disallows JWT authentication with restricted applications."""

def _mock_create_restricted_jwt(*args, **kwargs):
"""Pass an additional argument to `_create_jwt` without modifying the signature of `create_jwt_for_user`."""
kwargs['is_restricted'] = True
return _create_jwt(*args, **kwargs)

with patch('openedx.core.djangoapps.oauth_dispatch.jwt._create_jwt', _mock_create_restricted_jwt):
token = create_jwt_for_user(self.mock_user)

dispatch_url = self._get_dispatch_url()
headers = {'HTTP_AUTHORIZATION': 'JWT ' + token}

response = self.client.get(dispatch_url, {}, **headers)
assert 403 == response.status_code

response = self.client.post(dispatch_url, {}, **headers)
assert 403 == response.status_code

def test_missing_position_handler(self):
"""
Test that sending POST request without or invalid position argument don't raise server error
Expand Down
Loading