Skip to content

Commit

Permalink
[IMP] automation_oca: Add some security filters
Browse files Browse the repository at this point in the history
  • Loading branch information
etobella committed Mar 16, 2024
1 parent 7a4ebf2 commit 1fce83e
Show file tree
Hide file tree
Showing 4 changed files with 250 additions and 0 deletions.
129 changes: 129 additions & 0 deletions automation_oca/models/automation_record.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
# Copyright 2024 Dixmit
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

import logging
from collections import defaultdict

from odoo import api, fields, models

_logger = logging.getLogger(__name__)


class AutomationRecord(models.Model):

Expand Down Expand Up @@ -66,3 +71,127 @@ def _compute_resource_ref(self):
def _compute_name(self):
for record in self:
record.name = self.env[record.model].browse(record.res_id).display_name

@api.model
def _search(
self,
args,
offset=0,
limit=None,
order=None,
count=False,
access_rights_uid=None,
):
ids = super()._search(
args,
offset=offset,
limit=limit,
order=order,
count=False,
access_rights_uid=access_rights_uid,
)
if self.env.is_system():
# restrictions do not apply to group "Settings"
return len(ids) if count else ids

# TODO highlight orphaned EDI records in UI:
# - self.model + self.res_id are set
# - self.record returns empty recordset
# Remark: self.record is @property, not field

if not ids:
return 0 if count else []

Check warning on line 103 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L103

Added line #L103 was not covered by tests
orig_ids = ids
ids = set(ids)
result = []
model_data = defaultdict(
lambda: defaultdict(set)
) # {res_model: {res_id: set(ids)}}
for sub_ids in self._cr.split_for_in_conditions(ids):
self._cr.execute(
"""
SELECT id, res_id, model
FROM "%s"
WHERE id = ANY (%%(ids)s)"""
% self._table,
dict(ids=list(sub_ids)),
)
for eid, res_id, model in self._cr.fetchall():
if not model:
result.append(eid)
continue

Check warning on line 122 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L121-L122

Added lines #L121 - L122 were not covered by tests
model_data[model][res_id].add(eid)

for model, targets in model_data.items():
if not self.env[model].check_access_rights("read", False):
continue

Check warning on line 127 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L127

Added line #L127 was not covered by tests
recs = self.env[model].browse(list(targets))
missing = recs - recs.exists()
if missing:
for res_id in missing.ids:
_logger.warning(

Check warning on line 132 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L132

Added line #L132 was not covered by tests
"Deleted record %s,%s is referenced by edi.exchange.record %s",
model,
res_id,
list(targets[res_id]),
)
recs = recs - missing

Check warning on line 138 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L138

Added line #L138 was not covered by tests
allowed = (
self.env[model]
.with_context(active_test=False)
._search([("id", "in", recs.ids)])
)
for target_id in allowed:
result += list(targets[target_id])
if len(orig_ids) == limit and len(result) < len(orig_ids):
result.extend(

Check warning on line 147 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L147

Added line #L147 was not covered by tests
self._search(
args,
offset=offset + len(orig_ids),
limit=limit,
order=order,
count=count,
access_rights_uid=access_rights_uid,
)[: limit - len(result)]
)
# Restore original ordering
result = [x for x in orig_ids if x in result]
return len(result) if count else list(result)

def read(self, fields=None, load="_classic_read"):
"""Override to explicitely call check_access_rule, that is not called
by the ORM. It instead directly fetches ir.rules and apply them."""
self.check_access_rule("read")
return super().read(fields=fields, load=load)

Check warning on line 165 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L164-L165

Added lines #L164 - L165 were not covered by tests

def check_access_rule(self, operation):
"""In order to check if we can access a record, we are checking if we can access
the related document"""
super().check_access_rule(operation)
if self.env.is_superuser():
return
default_checker = self.env["mail.thread"].get_automation_access
by_model_rec_ids = defaultdict(set)
by_model_checker = {}

Check warning on line 175 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L173-L175

Added lines #L173 - L175 were not covered by tests
for exc_rec in self.sudo():
if not exc_rec.related_record_exists:
continue
by_model_rec_ids[exc_rec.model].add(exc_rec.res_id)

Check warning on line 179 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L178-L179

Added lines #L178 - L179 were not covered by tests
if exc_rec.model not in by_model_checker:
by_model_checker[exc_rec.model] = getattr(

Check warning on line 181 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L181

Added line #L181 was not covered by tests
self.env[exc_rec.model], "get_automation_access", default_checker
)

for model, rec_ids in by_model_rec_ids.items():
records = self.env[model].browse(rec_ids).with_user(self._uid)
checker = by_model_checker[model]

Check warning on line 187 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L186-L187

Added lines #L186 - L187 were not covered by tests
for record in records:
check_operation = checker(

Check warning on line 189 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L189

Added line #L189 was not covered by tests
[record.id], operation, model_name=record._name
)
record.check_access_rights(check_operation)
record.check_access_rule(check_operation)

Check warning on line 193 in automation_oca/models/automation_record.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/automation_record.py#L192-L193

Added lines #L192 - L193 were not covered by tests

def write(self, vals):
self.check_access_rule("write")
return super().write(vals)
28 changes: 28 additions & 0 deletions automation_oca/models/mail_thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,31 @@ def _message_route_process(self, message, message_dict, routes):
return super(MailThread, self)._message_route_process(
message, message_dict, routes
)

@api.model
def get_automation_access(self, doc_ids, operation, model_name=False):
"""Retrieve access policy.
The behavior is similar to `mail.thread` and `mail.message`
and it relies on the access rules defines on the related record.
The behavior can be customized on the related model
by defining `_automation_record_access`.
By default `write`, otherwise the custom permission is returned.
"""
DocModel = self.env[model_name] if model_name else self
create_allow = getattr(DocModel, "_automation_record_access", "write")

Check warning on line 56 in automation_oca/models/mail_thread.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/mail_thread.py#L55-L56

Added lines #L55 - L56 were not covered by tests
if operation in ["write", "unlink"]:
check_operation = "write"

Check warning on line 58 in automation_oca/models/mail_thread.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/mail_thread.py#L58

Added line #L58 was not covered by tests
elif operation == "create" and create_allow in [
"create",
"read",
"write",
"unlink",
]:
check_operation = create_allow

Check warning on line 65 in automation_oca/models/mail_thread.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/mail_thread.py#L65

Added line #L65 was not covered by tests
elif operation == "create":
check_operation = "write"

Check warning on line 67 in automation_oca/models/mail_thread.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/mail_thread.py#L67

Added line #L67 was not covered by tests
else:
check_operation = operation
return check_operation

Check warning on line 70 in automation_oca/models/mail_thread.py

View check run for this annotation

Codecov / codecov/patch

automation_oca/models/mail_thread.py#L69-L70

Added lines #L69 - L70 were not covered by tests
1 change: 1 addition & 0 deletions automation_oca/tests/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from . import test_automation_action
from . import test_automation_base
from . import test_automation_mail
from . import test_automation_security
92 changes: 92 additions & 0 deletions automation_oca/tests/test_automation_security.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# Copyright 2024 Dixmit
# License AGPL-3.0 or later (https://www.gnu.org/licenses/agpl).

from odoo.tests.common import users

from odoo.addons.mail.tests.common import mail_new_test_user

from .common import AutomationTestCase


class TestAutomationSecurity(AutomationTestCase):
@classmethod
def setUpClass(cls):
super().setUpClass()
# Removing rules in order to check only what we expect
cls.env["ir.rule"].search(
[("model_id", "=", cls.env.ref("base.model_res_partner").id)]
).toggle_active()

cls.user_automation_01 = mail_new_test_user(
cls.env,
login="user_automation_01",
name="User automation 01",
email="[email protected]",
company_id=cls.env.user.company_id.id,
notification_type="inbox",
groups="base.group_user,automation_oca.group_automation_user",
)
cls.user_automation_02 = mail_new_test_user(
cls.env,
login="user_automation_02",
name="User automation 01",
email="[email protected]",
company_id=cls.env.user.company_id.id,
notification_type="inbox",
groups="base.group_user,automation_oca.group_automation_user",
)
cls.group_1 = cls.env["res.groups"].create(
{
"name": "G1",
"users": [(4, cls.user_automation_01.id)],
"rule_groups": [
(
0,
0,
{
"name": "Rule 01",
"model_id": cls.env.ref("base.model_res_partner").id,
"domain_force": "[('id', '!=', %s)]" % cls.partner_01.id,
},
)
],
}
)
cls.group_2 = cls.env["res.groups"].create(
{
"name": "G2",
"users": [(4, cls.user_automation_02.id)],
"rule_groups": [
(
0,
0,
{
"name": "Rule 01",
"model_id": cls.env.ref("base.model_res_partner").id,
"domain_force": "[('id', '!=', %s)]" % cls.partner_02.id,
},
)
],
}
)
cls.configuration.editable_domain = [
("id", "in", (cls.partner_01 | cls.partner_02).ids)
]
cls.configuration.start_automation()
cls.env["automation.configuration"].cron_automation()

@users("user_automation_01")
def test_security_01(self):
record = self.env["automation.record"].search(
[("configuration_id", "=", self.configuration.id)]
)
self.assertEqual(1, len(record))
self.assertEqual(self.partner_02, record.resource_ref)

@users("user_automation_02")
def test_security_02(self):
record = self.env["automation.record"].search(
[("configuration_id", "=", self.configuration.id)]
)
self.assertEqual(1, len(record))
self.assertEqual(self.partner_01, record.resource_ref)

0 comments on commit 1fce83e

Please sign in to comment.