From 1fce83e591d7077fe53088f5aaaee8c48282b02a Mon Sep 17 00:00:00 2001 From: Enric Tobella Date: Sat, 16 Mar 2024 22:51:14 +0100 Subject: [PATCH] [IMP] automation_oca: Add some security filters --- automation_oca/models/automation_record.py | 129 ++++++++++++++++++ automation_oca/models/mail_thread.py | 28 ++++ automation_oca/tests/__init__.py | 1 + .../tests/test_automation_security.py | 92 +++++++++++++ 4 files changed, 250 insertions(+) create mode 100644 automation_oca/tests/test_automation_security.py diff --git a/automation_oca/models/automation_record.py b/automation_oca/models/automation_record.py index 59bc47a..fdba203 100644 --- a/automation_oca/models/automation_record.py +++ b/automation_oca/models/automation_record.py @@ -1,8 +1,13 @@ # Copyright 2024 Dixmit # License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). +import logging +from collections import defaultdict + from odoo import api, fields, models +_logger = logging.getLogger(__name__) + class AutomationRecord(models.Model): @@ -66,3 +71,127 @@ def _compute_resource_ref(self): def _compute_name(self): for record in self: record.name = self.env[record.model].browse(record.res_id).display_name + + @api.model + def _search( + self, + args, + offset=0, + limit=None, + order=None, + count=False, + access_rights_uid=None, + ): + ids = super()._search( + args, + offset=offset, + limit=limit, + order=order, + count=False, + access_rights_uid=access_rights_uid, + ) + if self.env.is_system(): + # restrictions do not apply to group "Settings" + return len(ids) if count else ids + + # TODO highlight orphaned EDI records in UI: + # - self.model + self.res_id are set + # - self.record returns empty recordset + # Remark: self.record is @property, not field + + if not ids: + return 0 if count else [] + orig_ids = ids + ids = set(ids) + result = [] + model_data = defaultdict( + lambda: defaultdict(set) + ) # {res_model: {res_id: set(ids)}} + for sub_ids in self._cr.split_for_in_conditions(ids): + self._cr.execute( + """ + SELECT id, res_id, model + FROM "%s" + WHERE id = ANY (%%(ids)s)""" + % self._table, + dict(ids=list(sub_ids)), + ) + for eid, res_id, model in self._cr.fetchall(): + if not model: + result.append(eid) + continue + model_data[model][res_id].add(eid) + + for model, targets in model_data.items(): + if not self.env[model].check_access_rights("read", False): + continue + recs = self.env[model].browse(list(targets)) + missing = recs - recs.exists() + if missing: + for res_id in missing.ids: + _logger.warning( + "Deleted record %s,%s is referenced by edi.exchange.record %s", + model, + res_id, + list(targets[res_id]), + ) + recs = recs - missing + allowed = ( + self.env[model] + .with_context(active_test=False) + ._search([("id", "in", recs.ids)]) + ) + for target_id in allowed: + result += list(targets[target_id]) + if len(orig_ids) == limit and len(result) < len(orig_ids): + result.extend( + self._search( + args, + offset=offset + len(orig_ids), + limit=limit, + order=order, + count=count, + access_rights_uid=access_rights_uid, + )[: limit - len(result)] + ) + # Restore original ordering + result = [x for x in orig_ids if x in result] + return len(result) if count else list(result) + + def read(self, fields=None, load="_classic_read"): + """Override to explicitely call check_access_rule, that is not called + by the ORM. It instead directly fetches ir.rules and apply them.""" + self.check_access_rule("read") + return super().read(fields=fields, load=load) + + def check_access_rule(self, operation): + """In order to check if we can access a record, we are checking if we can access + the related document""" + super().check_access_rule(operation) + if self.env.is_superuser(): + return + default_checker = self.env["mail.thread"].get_automation_access + by_model_rec_ids = defaultdict(set) + by_model_checker = {} + for exc_rec in self.sudo(): + if not exc_rec.related_record_exists: + continue + by_model_rec_ids[exc_rec.model].add(exc_rec.res_id) + if exc_rec.model not in by_model_checker: + by_model_checker[exc_rec.model] = getattr( + self.env[exc_rec.model], "get_automation_access", default_checker + ) + + for model, rec_ids in by_model_rec_ids.items(): + records = self.env[model].browse(rec_ids).with_user(self._uid) + checker = by_model_checker[model] + for record in records: + check_operation = checker( + [record.id], operation, model_name=record._name + ) + record.check_access_rights(check_operation) + record.check_access_rule(check_operation) + + def write(self, vals): + self.check_access_rule("write") + return super().write(vals) diff --git a/automation_oca/models/mail_thread.py b/automation_oca/models/mail_thread.py index 1dcedfe..837b7f7 100644 --- a/automation_oca/models/mail_thread.py +++ b/automation_oca/models/mail_thread.py @@ -40,3 +40,31 @@ def _message_route_process(self, message, message_dict, routes): return super(MailThread, self)._message_route_process( message, message_dict, routes ) + + @api.model + def get_automation_access(self, doc_ids, operation, model_name=False): + """Retrieve access policy. + + The behavior is similar to `mail.thread` and `mail.message` + and it relies on the access rules defines on the related record. + The behavior can be customized on the related model + by defining `_automation_record_access`. + + By default `write`, otherwise the custom permission is returned. + """ + DocModel = self.env[model_name] if model_name else self + create_allow = getattr(DocModel, "_automation_record_access", "write") + if operation in ["write", "unlink"]: + check_operation = "write" + elif operation == "create" and create_allow in [ + "create", + "read", + "write", + "unlink", + ]: + check_operation = create_allow + elif operation == "create": + check_operation = "write" + else: + check_operation = operation + return check_operation diff --git a/automation_oca/tests/__init__.py b/automation_oca/tests/__init__.py index c07455f..40a2ac7 100644 --- a/automation_oca/tests/__init__.py +++ b/automation_oca/tests/__init__.py @@ -1,3 +1,4 @@ from . import test_automation_action from . import test_automation_base from . import test_automation_mail +from . import test_automation_security diff --git a/automation_oca/tests/test_automation_security.py b/automation_oca/tests/test_automation_security.py new file mode 100644 index 0000000..c74905b --- /dev/null +++ b/automation_oca/tests/test_automation_security.py @@ -0,0 +1,92 @@ +# Copyright 2024 Dixmit +# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl). + +from odoo.tests.common import users + +from odoo.addons.mail.tests.common import mail_new_test_user + +from .common import AutomationTestCase + + +class TestAutomationSecurity(AutomationTestCase): + @classmethod + def setUpClass(cls): + super().setUpClass() + # Removing rules in order to check only what we expect + cls.env["ir.rule"].search( + [("model_id", "=", cls.env.ref("base.model_res_partner").id)] + ).toggle_active() + + cls.user_automation_01 = mail_new_test_user( + cls.env, + login="user_automation_01", + name="User automation 01", + email="user_automation_01@test.example.com", + company_id=cls.env.user.company_id.id, + notification_type="inbox", + groups="base.group_user,automation_oca.group_automation_user", + ) + cls.user_automation_02 = mail_new_test_user( + cls.env, + login="user_automation_02", + name="User automation 01", + email="user_automation_02@test.example.com", + company_id=cls.env.user.company_id.id, + notification_type="inbox", + groups="base.group_user,automation_oca.group_automation_user", + ) + cls.group_1 = cls.env["res.groups"].create( + { + "name": "G1", + "users": [(4, cls.user_automation_01.id)], + "rule_groups": [ + ( + 0, + 0, + { + "name": "Rule 01", + "model_id": cls.env.ref("base.model_res_partner").id, + "domain_force": "[('id', '!=', %s)]" % cls.partner_01.id, + }, + ) + ], + } + ) + cls.group_2 = cls.env["res.groups"].create( + { + "name": "G2", + "users": [(4, cls.user_automation_02.id)], + "rule_groups": [ + ( + 0, + 0, + { + "name": "Rule 01", + "model_id": cls.env.ref("base.model_res_partner").id, + "domain_force": "[('id', '!=', %s)]" % cls.partner_02.id, + }, + ) + ], + } + ) + cls.configuration.editable_domain = [ + ("id", "in", (cls.partner_01 | cls.partner_02).ids) + ] + cls.configuration.start_automation() + cls.env["automation.configuration"].cron_automation() + + @users("user_automation_01") + def test_security_01(self): + record = self.env["automation.record"].search( + [("configuration_id", "=", self.configuration.id)] + ) + self.assertEqual(1, len(record)) + self.assertEqual(self.partner_02, record.resource_ref) + + @users("user_automation_02") + def test_security_02(self): + record = self.env["automation.record"].search( + [("configuration_id", "=", self.configuration.id)] + ) + self.assertEqual(1, len(record)) + self.assertEqual(self.partner_01, record.resource_ref)