From f246d66d587bbc75c93f4f56c2f5e6ac6dc585d7 Mon Sep 17 00:00:00 2001 From: Piotr Surowiec Date: Tue, 16 Jan 2024 01:39:45 +0100 Subject: [PATCH] Merge pull request #34048 from open-craft/agrendalath/xblock-callback-jwt-restricted-application-check-palm fix(palm): add `JwtRestrictedApplication` check to XBlock callback --- lms/djangoapps/courseware/block_render.py | 12 ++++++++-- .../courseware/tests/test_block_render.py | 22 ++++++++++++++++++- 2 files changed, 31 insertions(+), 3 deletions(-) diff --git a/lms/djangoapps/courseware/block_render.py b/lms/djangoapps/courseware/block_render.py index e9cf15678e..a741e3219f 100644 --- a/lms/djangoapps/courseware/block_render.py +++ b/lms/djangoapps/courseware/block_render.py @@ -27,6 +27,7 @@ from edx_proctoring.api import get_attempt_status_summary from edx_proctoring.services import ProctoringService from edx_rest_framework_extensions.auth.jwt.authentication import JwtAuthentication +from edx_rest_framework_extensions.permissions import JwtRestrictedApplication from edx_when.field_data import DateLookupFieldData from eventtracking import tracker from opaque_keys import InvalidKeyError @@ -841,12 +842,19 @@ def handle_xblock_callback(request, course_id, usage_id, handler, suffix=None): ) else: if user_auth_tuple is not None: - request.user, _ = user_auth_tuple + # When using JWT authentication, the second element contains the JWT token. We need it to determine + # whether the application that issued the token is restricted. + request.user, request.auth = user_auth_tuple + # This is verified by the `JwtRestrictedApplication` before it decodes the token. + request.successful_authenticator = authenticator break # NOTE (CCB): Allow anonymous GET calls (e.g. for transcripts). Modifying this view is simpler than updating # the XBlocks to use `handle_xblock_callback_noauth`, which is practically identical to this view. - if request.method != 'GET' and not (request.user and request.user.is_authenticated): + # Block all request types coming from restricted applications. + if ( + request.method != 'GET' and not (request.user and request.user.is_authenticated) + ) or JwtRestrictedApplication().has_permission(request, None): # type: ignore return HttpResponseForbidden('Unauthenticated') request.user.known = request.user.is_authenticated diff --git a/lms/djangoapps/courseware/tests/test_block_render.py b/lms/djangoapps/courseware/tests/test_block_render.py index 53e10b5c51..b0c14e3dbf 100644 --- a/lms/djangoapps/courseware/tests/test_block_render.py +++ b/lms/djangoapps/courseware/tests/test_block_render.py @@ -81,7 +81,7 @@ from lms.djangoapps.lms_xblock.field_data import LmsFieldData from openedx.core.djangoapps.credit.api import set_credit_requirement_status, set_credit_requirements from openedx.core.djangoapps.credit.models import CreditCourse -from openedx.core.djangoapps.oauth_dispatch.jwt import create_jwt_for_user +from openedx.core.djangoapps.oauth_dispatch.jwt import _create_jwt, create_jwt_for_user from openedx.core.djangoapps.oauth_dispatch.tests.factories import AccessTokenFactory, ApplicationFactory from openedx.core.lib.courses import course_image_url from openedx.core.lib.gating import api as gating_api @@ -375,6 +375,26 @@ def test_jwt_authentication(self): response = self.client.post(dispatch_url, {}, **headers) assert 200 == response.status_code + def test_jwt_authentication_with_restricted_application(self): + """Test that the XBlock endpoint disallows JWT authentication with restricted applications.""" + + def _mock_create_restricted_jwt(*args, **kwargs): + """Pass an additional argument to `_create_jwt` without modifying the signature of `create_jwt_for_user`.""" + kwargs['is_restricted'] = True + return _create_jwt(*args, **kwargs) + + with patch('openedx.core.djangoapps.oauth_dispatch.jwt._create_jwt', _mock_create_restricted_jwt): + token = create_jwt_for_user(self.mock_user) + + dispatch_url = self._get_dispatch_url() + headers = {'HTTP_AUTHORIZATION': 'JWT ' + token} + + response = self.client.get(dispatch_url, {}, **headers) + assert 403 == response.status_code + + response = self.client.post(dispatch_url, {}, **headers) + assert 403 == response.status_code + def test_missing_position_handler(self): """ Test that sending POST request without or invalid position argument don't raise server error