From 5d53823e863def2d6c78868e480c2f4c029c4e19 Mon Sep 17 00:00:00 2001 From: Shahwaiz Punjwani Date: Thu, 9 Jan 2025 09:02:52 -0600 Subject: [PATCH] [DA-4616] AwardeeInSite Backend Job & API (#4032) * Add Awardee InSite API * Add Awardee InSite API tests * Add cloud scheduler to run awardee insite datafeed * Add Awardee InSite Feed * Add datafeed queries for awardee insite * Fix get_id * Update function name * Add awardee insite data feed test * Add enrollment status query * Map enum cols from PS table to strings * Update test * Add test * Update to_client_json * Handle withdrawn participants * Update model * Fix queries & add update withdrawn query * Add awardee insite migration file * Add staticmethod * Disable withdrawn update for testing * Add POST to AwardeeInSiteInputFeed * Fix * Fix table name * Add missing col * Fix * Camel case vals * Add snake_to_camel case func * Fixes * Add ehr cond for testing * Update get_id * Test * Fix bug * Testing * Add logging for debugging * Add logging for debugging * Add logging for debugging * Add logging for debugging * Rm logging/print * Add else unset in query * Debug patient status * Fix * Fixes * Remove patientStatus * Patient Status test * Debug * rdr_service/workflow_management/ppsc/ppsc_data_transfer_input_feed.py * rdr_service/workflow_management/ppsc/ppsc_data_transfer_input_feed.py * Add eval * Add logging, ast * Add patientStatus cond * Fix tests * Improve logging * Fix query * Fix alias * Handle withdrawn participants * Fix * Handle withdrawn pids inside the query * Finalize query * Rm withdrawn col list * Update to_client_json * Update test * Set site default to NULL * Update comment * Correct doc * Update method to use baseclass * Use base dao's method * Update method name * Add awardee insite docs --- doc/api_workflows/api_resource_ref.rst | 1 + .../awardee_insite_field_list.rst | 8 + .../ae17f737a2cc_add_awardee_insite.py | 143 +++++ rdr_service/api/awardee_insite_api.py | 103 ++++ rdr_service/dao/awardee_insite_dao.py | 163 ++++++ rdr_service/main.py | 8 + rdr_service/model/awardee_insite.py | 337 ++++++++++++ rdr_service/ppsc_pipeline/main.py | 16 +- .../ppsc/data_feed_queries.py | 489 ++++++++++++++++++ .../ppsc/ppsc_data_transfer_input_feed.py | 67 ++- tests/api_tests/test_awardee_insite_api.py | 364 +++++++++++++ .../test_ppsc_data_transfer_input.py | 104 +++- 12 files changed, 1800 insertions(+), 3 deletions(-) create mode 100644 doc/api_workflows/field_reference/awardee_insite_field_list.rst create mode 100644 rdr_service/alembic/ppsc/versions/ae17f737a2cc_add_awardee_insite.py create mode 100644 rdr_service/api/awardee_insite_api.py create mode 100644 rdr_service/dao/awardee_insite_dao.py create mode 100644 rdr_service/model/awardee_insite.py create mode 100644 tests/api_tests/test_awardee_insite_api.py diff --git a/doc/api_workflows/api_resource_ref.rst b/doc/api_workflows/api_resource_ref.rst index 843b6cd83e..11a085703d 100644 --- a/doc/api_workflows/api_resource_ref.rst +++ b/doc/api_workflows/api_resource_ref.rst @@ -6,6 +6,7 @@ API Resource Reference :maxdepth: 2 Participant Summary Field List + Awardee InSite Field List BioBank Order Field List Enumerated Field Options Reference Patient API Reference diff --git a/doc/api_workflows/field_reference/awardee_insite_field_list.rst b/doc/api_workflows/field_reference/awardee_insite_field_list.rst new file mode 100644 index 0000000000..cba812ac0f --- /dev/null +++ b/doc/api_workflows/field_reference/awardee_insite_field_list.rst @@ -0,0 +1,8 @@ +============================================================ +Awardee InSite Field List +============================================================ + +.. autoclass:: rdr_service.model.awardee_insite.AwardeeInSite + :members: + :undoc-members: + :exclude-members: internal_fields, id, modified, created, create_surrogate_key_sql diff --git a/rdr_service/alembic/ppsc/versions/ae17f737a2cc_add_awardee_insite.py b/rdr_service/alembic/ppsc/versions/ae17f737a2cc_add_awardee_insite.py new file mode 100644 index 0000000000..6cf2ef83de --- /dev/null +++ b/rdr_service/alembic/ppsc/versions/ae17f737a2cc_add_awardee_insite.py @@ -0,0 +1,143 @@ +"""add_awardee_insite + +Revision ID: ae17f737a2cc +Revises: 6e07f138ede8 +Create Date: 2024-12-30 11:44:27.237681 + +""" +from alembic import op +import sqlalchemy as sa +import rdr_service.model.utils +from sqlalchemy.dialects import mysql + + +# revision identifiers, used by Alembic. +revision = "ae17f737a2cc" +down_revision = "6e07f138ede8" +branch_labels = None +depends_on = None + + +def upgrade(engine_name): + globals()["upgrade_%s" % engine_name]() + + +def downgrade(engine_name): + globals()["downgrade_%s" % engine_name]() + + +def upgrade_ppsc(): + # ### commands auto generated by Alembic - please adjust! ### + op.create_table( + "awardee_insite", + sa.Column("id", sa.Integer(), autoincrement=True, nullable=False), + sa.Column("created", rdr_service.model.utils.UTCDateTime(), nullable=True), + sa.Column("modified", rdr_service.model.utils.UTCDateTime(), nullable=True), + sa.Column("participant_id", sa.BigInteger(), nullable=False), + sa.Column("first_name", sa.String(length=255), nullable=True), + sa.Column("middle_name", sa.String(length=255), nullable=True), + sa.Column("last_name", sa.String(length=255), nullable=True), + sa.Column("zip_code", sa.String(length=10), nullable=True), + sa.Column("state", sa.String(length=255), nullable=True), + sa.Column("city", sa.String(length=255), nullable=True), + sa.Column("street_address", sa.String(length=255), nullable=True), + sa.Column("street_address2", sa.String(length=255), nullable=True), + sa.Column("phone_number", sa.String(length=80), nullable=True), + sa.Column("email", sa.String(length=255), nullable=True), + sa.Column("date_of_birth", sa.Date(), nullable=True), + sa.Column("organization", sa.String(length=255), nullable=True), + sa.Column("withdrawal_status", sa.String(length=32), nullable=False), + sa.Column( + "withdrawal_time", rdr_service.model.utils.UTCDateTime(), nullable=True + ), + sa.Column("deactivation_status", sa.String(length=32), nullable=False), + sa.Column( + "deactivation_time", rdr_service.model.utils.UTCDateTime(), nullable=True + ), + sa.Column("deceased_status", sa.String(length=32), nullable=False), + sa.Column( + "deceased_authored", rdr_service.model.utils.UTCDateTime(), nullable=True + ), + sa.Column( + "clinic_physical_measurements_status", sa.String(length=32), nullable=False + ), + sa.Column( + "clinic_physical_measurements_finalized_time", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column( + "clinic_physical_measurements_finalized_site", + sa.String(length=255), + nullable=True, + ), + sa.Column( + "self_reported_physical_measurements_status", + sa.String(length=32), + nullable=False, + ), + sa.Column( + "self_reported_physical_measurements_authored", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column( + "consent_for_electronic_health_records", + sa.String(length=10), + nullable=False, + ), + sa.Column( + "consent_for_electronic_health_records_authored", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column( + "consent_for_electronic_health_records_first_yes_authored", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column( + "first_ehr_receipt_time", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column( + "latest_ehr_receipt_time", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column("consent_for_study_enrollment", sa.String(length=10), nullable=False), + sa.Column( + "consent_for_study_enrollment_authored", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column("patient_status", mysql.JSON(), nullable=True), + sa.Column("enrollment_status", sa.String(length=32), nullable=True), + sa.Column("biospecimen_source_site", sa.String(length=255), nullable=True), + sa.Column( + "biospecimen_order_time", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.Column("biospecimen_status", sa.String(length=255), nullable=False), + sa.Column( + "sample_1sal2_collection_method", sa.String(length=255), nullable=False + ), + sa.Column("sample_status_1sal2", sa.String(length=255), nullable=False), + sa.Column("sample_order_status_1sal2", sa.String(length=255), nullable=False), + sa.Column( + "sample_order_status_1sal2_time", + rdr_service.model.utils.UTCDateTime(), + nullable=True, + ), + sa.PrimaryKeyConstraint("id"), + schema="ppsc", + ) + # ### end Alembic commands ### + + +def downgrade_ppsc(): + # ### commands auto generated by Alembic - please adjust! ### + op.drop_table("awardee_insite", schema="ppsc") + # ### end Alembic commands ### diff --git a/rdr_service/api/awardee_insite_api.py b/rdr_service/api/awardee_insite_api.py new file mode 100644 index 0000000000..889c5c9c6c --- /dev/null +++ b/rdr_service/api/awardee_insite_api.py @@ -0,0 +1,103 @@ +from flask import request +from werkzeug.exceptions import BadRequest, InternalServerError + +from rdr_service.query import Query, Results +from rdr_service.api.base_api import BaseApi, log_api_request +from rdr_service.app_util import auth_required, get_validated_user_info +from rdr_service.api_util import AWARDEE, RDR +from rdr_service.dao.awardee_insite_dao import AwardeeInSiteDao + + +AWARDEE_INSITE_PAGINATION_MAX_RESULTS = 1000 + + +class AwardeeInSiteApi(BaseApi): + def __init__(self): + super().__init__(AwardeeInSiteDao()) + self.awardee = None + + @auth_required([RDR] + [AWARDEE]) + def get(self, id_=None, participant_id=None): + log_api_request(log=request.log_record) + + _, user_info = get_validated_user_info() + + # Get the "awardee" linked to the user_email from the config + if AWARDEE in user_info["roles"]: + try: + self.awardee = user_info["awardee"] + except KeyError: + raise InternalServerError("Config error for awardee") + + # In case RDR needs to call the API, they can pass an awardee query param with a awardee name to get the data + if RDR in user_info["roles"]: + self.awardee = request.args.get("awardee") + if not self.awardee: + raise BadRequest( + "Awardee not found. Please pass an awardee to the query" + ) + + return self._query() + + def _make_query(self, check_invalid: bool = True) -> Query: + """ + Returns a Query object, setting properties like the max_results to be returned + in a page and field filters (awardee name, last modified). + """ + query_definition = super()._make_query(check_invalid) + + field_filters = [] + if self.awardee: + field_filters.append(self.dao.make_query_filter("awardee", self.awardee)) + + if len(request.args) > 0: + for key, value in request.args.items(multi=True): + if key == "updatedSince": + field_filters.append(self.dao.make_query_filter(key, value)) + + query_definition.field_filters = field_filters + query_definition.max_results = AWARDEE_INSITE_PAGINATION_MAX_RESULTS + + return query_definition + + def _query(self) -> dict: + """ + Called in GET function. Creates a Query object and then runs that + query and return the payload. + """ + query_definition: Query = self._make_query() + results: Results = self.dao.query(query_definition) + payload: dict = self._make_bundle(results) + return payload + + def _make_bundle(self, results: Results) -> dict: + """ + Return response in a dict. If pagination token exists (meaning there is a next page), it creates + a URL so that the client can call that URL to retrieve the next page. The URL to get the + next page, and the participants results are added into the dictionary to be sent to the client. + + :param results: Result object containing the results of the query in + the item attribute and pagination token in the pagination_token attribute. + :return: Payload that will be sent in the GET request. + """ + + from rdr_service import main + + bundle_dict = {"resourceType": "Bundle", "type": "searchset"} + if results.pagination_token: + query_params = request.args.copy() + query_params["_token"] = results.pagination_token + next_url = main.api.url_for( + self.__class__, _external=True, **query_params.to_dict(flat=False) + ) + bundle_dict["link"] = [{"relation": "next", "url": next_url}] + + entries = [] + for item in results.items: + resource = self._make_response(item[0]) # item = [awardee_model, HPO.name] + entries.append({"resource": resource}) + + bundle_dict["entry"] = entries + if results.total is not None: + bundle_dict["total"] = results.total + return bundle_dict diff --git a/rdr_service/dao/awardee_insite_dao.py b/rdr_service/dao/awardee_insite_dao.py new file mode 100644 index 0000000000..fa0810037c --- /dev/null +++ b/rdr_service/dao/awardee_insite_dao.py @@ -0,0 +1,163 @@ +import re +from datetime import datetime + +from sqlalchemy.orm import Session +from sqlalchemy.orm.query import Query as SQLAlchemyQuery +from werkzeug.exceptions import BadRequest + +from rdr_service.code_constants import UNSET +from rdr_service.model.utils import to_client_participant_id +from rdr_service.model.hpo import HPO +from rdr_service.model.organization import Organization +from rdr_service.model.awardee_insite import AwardeeInSite +from rdr_service.dao.base_dao import UpsertableDao +from rdr_service.query import Results, Query, FieldFilter, Operator + + +class AwardeeInSiteDao(UpsertableDao): + def __init__(self): + super().__init__(AwardeeInSite) + self.total_result = None + + def snake_to_camel_case(self, string_value: str) -> str: + string = self.snake_to_camel(string_value) + # Replace sal with SAL + return re.sub(r'(\d)sal(\d)', r'\1SAL\2', string, flags=re.IGNORECASE) + + def get_id(self, obj: AwardeeInSite) -> int | None: + """ + Return id if a participant already exists in the table, + otherwise return None. This function is used for updating an + existing record in MySQL table when streaming from BQ. + """ + with self.session() as session: + query = session.query(AwardeeInSite.id).filter( + AwardeeInSite.participantId == obj.participantId + ) + res = query.first() + + if res: + return res.id + + def make_query_filter(self, field_name: str, value: str) -> FieldFilter: + """ + Return the filter object that will be used as argument to create a Query object. + Called from _make_query in the API. + """ + if field_name == "awardee": + return FieldFilter("HPO.name", Operator.EQUALS, value) + if field_name == "updatedSince": + return FieldFilter( + "AwardeeInSite.modified", + Operator.GREATER_THAN_OR_EQUALS, + datetime.strptime(value, "%Y-%m-%d"), + ) + + @staticmethod + def add_awardee_col(query: SQLAlchemyQuery) -> SQLAlchemyQuery: + """ + Add awardee (HPO name) to the table, so we can filter on it. + This will be utilized to ensure that we only return participants + associated with an awardee when that awardee calls the API. + """ + query = ( + query.add_columns(HPO.name) + .outerjoin( + Organization, AwardeeInSite.organization == Organization.externalId + ) + .join(HPO, HPO.hpoId == Organization.hpoId) + ) # filter out obsolete HPOs? + return query + + def _set_filters( + self, query: SQLAlchemyQuery, filter_list: list[FieldFilter] + ) -> SQLAlchemyQuery: + """Add filters to SQLAlchemy query and then return that query""" + str_to_model_map = {"HPO": HPO, "AwardeeInSite": AwardeeInSite} + for field_filter in filter_list: + model, col = field_filter.field_name.split(".") + try: + filter_attribute = getattr(str_to_model_map[model], col) + except AttributeError: + raise BadRequest( + f"No field named {field_filter.field_name} found on {model}." + ) + query = self._add_filter(query, field_filter, filter_attribute) + return query + + def _make_query( + self, session: Session, query_definition: Query + ) -> tuple[SQLAlchemyQuery, list]: + """ + Return a SQLAlchemy query from a Query object passed in as a parameter. + Also returns the name of the columns that pagination will use to create token. + Adds pagination filter where it decodes the pagination token to grab only the + records for the next page. + """ + query: SQLAlchemyQuery = super()._initialize_query(session, query_definition) + query = AwardeeInSiteDao.add_awardee_col(query) + query = self._set_filters(query, query_definition.field_filters) + order_by_field_names, order_by_fields, first_descending = ( + ["id"], + [AwardeeInSite.id], + False, + ) + + if query_definition.include_total: + self.total_result = query.count() + + if query_definition.pagination_token: + query = self._add_pagination_filter( + query, query_definition, order_by_fields, first_descending + ) + query.limit(query_definition.max_results + 1) + return query, order_by_field_names + + def query(self, query_definition: Query) -> Results: + with self.session() as session: + # query_definition param passed from API file + query, order_by_field_names = self._make_query(session, query_definition) + items = query.all() + if not items: + return Results([]) + + if len(items) > query_definition.max_results: + page = items[0: query_definition.max_results] + pagination_token = self._make_pagination_token( + items[query_definition.max_results - 1][0].asdict(), + order_by_field_names, + ) + return Results( + page, pagination_token, more_available=True, total=self.total_result + ) + + else: + pagination_token = ( + ( + self._make_pagination_token( + items[-1][0].asdict(), order_by_field_names + ) + ) + if query_definition.always_return_token + else None + ) + return Results( + items, pagination_token, more_available=False, total=self.total_result + ) + + def to_client_json(self, model: AwardeeInSite) -> dict: + """ + Returns a response dict containing required values for a given row. + If a column is null, it sets it to 'UNSET'. + """ + result = model.asdict() + + for field in AwardeeInSite.internal_fields: + del result[field] + + result["participantId"] = to_client_participant_id(result["participantId"]) + final_result = { + key: value if isinstance(value, list) else value or UNSET if value != UNSET.lower() else UNSET + for key, value in result.items() + } + return final_result diff --git a/rdr_service/main.py b/rdr_service/main.py index a46b30dffe..e53624f455 100644 --- a/rdr_service/main.py +++ b/rdr_service/main.py @@ -51,6 +51,7 @@ from rdr_service.api.nph_participant_biobank_order_api import NphOrderApi, DlwDosageApi from rdr_service.api.nph_participant_api import nph_participant from rdr_service.api.site_hierarchy_api import SiteHierarchyApi +from rdr_service.api.awardee_insite_api import AwardeeInSiteApi from rdr_service.services.flask import app, API_PREFIX, flask_warmup, flask_start, flask_stop from rdr_service.services.gcp_logging import begin_request_logging, end_request_logging, \ @@ -475,6 +476,13 @@ def _log_request_exception(sender, exception, **extra): # pylint: disable=unuse methods=['POST', 'DELETE'] ) +api.add_resource( + AwardeeInSiteApi, + API_PREFIX + 'AwardeeInSite', + endpoint='awardeeinsite', + methods=['GET'] +) + app.add_url_rule("/_ah/warmup", endpoint="warmup", view_func=flask_warmup, methods=["GET"]) app.add_url_rule("/_ah/start", endpoint="start", view_func=flask_start, methods=["GET"]) app.add_url_rule("/_ah/stop", endpoint="stop", view_func=flask_stop, methods=["GET"]) diff --git a/rdr_service/model/awardee_insite.py b/rdr_service/model/awardee_insite.py new file mode 100644 index 0000000000..19523e6d4d --- /dev/null +++ b/rdr_service/model/awardee_insite.py @@ -0,0 +1,337 @@ +from sqlalchemy import Column, BigInteger, Integer, String, Date, event +from sqlalchemy.dialects.mysql import JSON + +from rdr_service.model.utils import UTCDateTime +from rdr_service.model.base import ( + model_insert_listener, + model_update_listener, + PPSCBase, +) + + +class AwardeeInSite(PPSCBase): + __tablename__ = "awardee_insite" + + internal_fields = ["id", "created", "modified"] + + id = Column("id", Integer, autoincrement=True, primary_key=True) + created = Column("created", UTCDateTime) + modified = Column("modified", UTCDateTime) + + participantId = Column("participant_id", BigInteger, nullable=False) + """ + PMI-specific ID generated by the RDR and used for tracking/linking participant data. + Human-readable 10-character string beginning with P. + """ + + firstName = Column("first_name", String(255), nullable=True) + """The first name of the participant.""" + middleName = Column("middle_name", String(255), nullable=True) + """The middle name of the participant.""" + lastName = Column("last_name", String(255), nullable=True) + """The last name of the participant.""" + zipCode = Column("zip_code", String(10), nullable=True) + """The postal zip code of the participant.""" + state = Column("state", String(255), nullable=True) + """The state the participant lives in.""" + city = Column("city", String(255), nullable=True) + """The city the participant lives in.""" + streetAddress = Column("street_address", String(255), nullable=True) + """Line 1 of the street address the participant lives at.""" + streetAddress2 = Column("street_address2", String(255), nullable=True) + """Line 2 of the street address the participant lives at. Absent if no line 2 given.""" + phoneNumber = Column("phone_number", String(80), nullable=True) + """The phone number of the participant.""" + email = Column("email", String(255), nullable=True) + """The email address participant provided to register.""" + dateOfBirth = Column("date_of_birth", Date, nullable=True) + """The day the participant was born.""" + + organization = Column("organization", String(255), nullable=True) + """An organization a participant is paired with or UNSET if none.""" + + withdrawalStatus = Column( + "withdrawal_status", String(32), nullable=False, default="not_withdrawn" + ) + """ + Indicates whether the participant has withdrawn from the study and does not want their data used in future. + + Values: + + * not_withdrawn + * withdrawn + """ + withdrawalTime = Column("withdrawal_time", UTCDateTime, nullable=True) + """The date and time at which the participant withdrew from the study.""" + + deactivationStatus = Column( + "deactivation_status", String(32), nullable=False, default="not_deactivated" + ) + """ + Indicates whether the participant has indicated they do not want to be contacted anymore; + also shouldn’t have any EHR data transferred after the given suspension date. + + Values: + + * not_deactivated + * deactivated + """ + deactivationTime = Column("deactivation_time", UTCDateTime, nullable=True) + """ + The date and time at which the participant has indicated they do not want to be contacted anymore; + also shouldn’t have any EHR data transferred after the given suspension date. + """ + + deceasedStatus = Column( + "deceased_status", String(32), nullable=False, default="unset" + ) + """ + Indicates whether the participant has a pending or approved deceased report. + + Values: + + * Pending = pending report + * Deceased = report approved + * unset = no deceased report or a denied deceased report + """ + deceasedAuthored = Column("deceased_authored", UTCDateTime, nullable=True) + """ + The UTC timestamp of when the report was entered into an external system, + or when it was approved externally if it has been approved. + """ + + clinicPhysicalMeasurementsStatus = Column( + "clinic_physical_measurements_status", + String(32), + nullable=False, + default="unset", + ) + """ + Indicates whether this participant has completed physical measurements. + + Values: + + * unset + * Completed + * Cancelled + """ + clinicPhysicalMeasurementsFinalizedTime = Column( + "clinic_physical_measurements_finalized_time", UTCDateTime, nullable=True + ) + """Indicates the latest time physical measurements were finalized for the participant.""" + clinicPhysicalMeasurementsFinalizedSite = Column( + "clinic_physical_measurements_finalized_site", String(255), nullable=True + ) + """An indicator for the site where the physical measurements were finalized for each participant.""" + + selfReportedPhysicalMeasurementsStatus = Column( + "self_reported_physical_measurements_status", + String(32), + nullable=False, + default="unset", + ) + """ + Indicates whether this participant has completed self-reported physical measurements. + + Values: + + * unset + * Completed + * Cancelled + """ + selfReportedPhysicalMeasurementsAuthored = Column( + "self_reported_physical_measurements_authored", UTCDateTime, nullable=True + ) + """Indicates the latest time the participant authored the survey for self-reporting physical measurements.""" + + consentForElectronicHealthRecords = Column( + "consent_for_electronic_health_records", + String(10), + nullable=False, + default="no", + ) + """ + Indicates whether electronic health records (EHR) consent has been received. If an EHR authorization has expired, + the value will be set back to no. + + Values: + + * yes + * no + """ + consentForElectronicHealthRecordsAuthored = Column( + "consent_for_electronic_health_records_authored", UTCDateTime, nullable=True + ) + """ + Indicates the latest time at which the participant completed consent with the answer indicated in the status field. + """ + consentForElectronicHealthRecordsFirstYesAuthored = Column( + "consent_for_electronic_health_records_first_yes_authored", + UTCDateTime, + nullable=True, + ) + """Indicates the first time at which the participant consented with Yes""" + + firstEhrReceiptTime = Column("first_ehr_receipt_time", UTCDateTime, nullable=True) + """UTC timestamp indicating when RDR was first made aware of signed and uploaded EHR documents""" + latestEhrReceiptTime = Column("latest_ehr_receipt_time", UTCDateTime, nullable=True) + """UTC timestamp indicating the latest time RDR was aware of signed and uploaded EHR documents""" + + consentForStudyEnrollment = Column( + "consent_for_study_enrollment", String(10), nullable=False, default="no" + ) + """ + Indicates whether enrollment consent, or pediatric permission, has been received. + + Values: + + * yes + * no + """ + consentForStudyEnrollmentAuthored = Column( + "consent_for_study_enrollment_authored", UTCDateTime, nullable=True + ) + """ + The UTC date time of the latest time the participant, or a guardian, completed the necessary consent/permission + forms to be considered consented for study enrollment. + """ + + patientStatus = Column("patient_status", JSON, nullable=True, default=list()) + """ + A flag available for sites of in person enrollment. A participant can have a status from multiple sites. Example: + + .. code-block:: json + + "patientStatus": { + “PITT_UPMC”: “YES”, + “PITT_TEMPLE”: “NO_ACCESS”, + “PITT_SOMETHING”: “NO” + } + + .. note:: + The following values are available. + + * Yes: Confirmed in EHR system. + * No: Not found in EHR system. + * No Access: Unable to check EHR system. + * Unknown: Inconclusive search results. + * Not Applicable (will apply to DVs only). + """ + + enrollmentStatus = Column("enrollment_status", String(32), nullable=True) + """ + Participant’s most recently obtained enrollment status. Statuses are defined by the current data glossary. + + Values: + + * registered + * participant + * participant_ehr_consent + * enrolled + * pmb_eligibile + * core_minus_pm + * core_participant + """ + biospecimenSourceSite = Column( + "biospecimen_source_site", String(255), nullable=True + ) + """The site where the biospecimens were initially created for the participant""" + biospecimenOrderTime = Column("biospecimen_order_time", UTCDateTime, nullable=True) + """The first time at which the biospecimens were finalized in UTC.""" + biospecimenStatus = Column( + "biospecimen_status", String(255), nullable=False, default="unset" + ) + """ + An indicator for whether biospecimens have been finalized for the participant + + Values: + + * unset + * created + * collected + * processed + * finalized + """ + + sample1SAL2CollectionMethod = Column( + "sample_1sal2_collection_method", String(255), nullable=False, default="unset" + ) + """ + Gives how the 1SAL2 sample was collected (ie on site or using a mail kit) + + Values: + + * unset + * mail_kit + * on_site + """ + sampleStatus1SAL2 = Column( + "sample_status_1sal2", String(255), nullable=False, default="unset" + ) + """ + The result of biobank processing on sample 1SAL2. + + Values: + + * unset + * received + * disposed + * consumed + * unknown + * sample_not_received + * sample_not_processed + * accessinging_error + * lab_accident + * qns_for_processing + * quality_issue + """ + sampleOrderStatus1SAL2 = Column( + "sample_order_status_1sal2", String(255), nullable=False, default="unset" + ) + """ + The individual order status of sample 1SAL2. + + Values: + + * unset + * created + * collected + * processed + * finalized + """ + sampleOrderStatus1SAL2Time = Column( + "sample_order_status_1sal2_time", UTCDateTime, nullable=True + ) + """The time the sample was marked as finalized by the processing site.""" + + @classmethod + def create_surrogate_key_sql(cls) -> str: + """ + Generates a SQL string for computing a hash of concatenated column values. + + :return: A SQL string that computes the hash of the concatenated column values. + Eg: >>> create_surrogate_key_sql() + "FARM_FINGERPRINT(CONCAT(COALESCE(CAST(col1 AS STRING), ''), '|' , COALESCE(CAST(col2 AS STRING), '') ))" + """ + keys = [ + column.key + for column in AwardeeInSite.__table__.columns + if column.key not in AwardeeInSite.internal_fields + ] + sql_string = ( + "FARM_FINGERPRINT(CONCAT(" + + ", ".join( + ( + f"COALESCE(CAST({key} AS STRING), ''), '|' " + if key != "patient_status" + else "COALESCE(TO_JSON_STRING(patient_status), '[]'), '|' " + ) + for key in keys + ).rstrip(", '|' ") + + "))" + ) + return sql_string + + +event.listen(AwardeeInSite, "before_insert", model_insert_listener) +event.listen(AwardeeInSite, "before_update", model_update_listener) diff --git a/rdr_service/ppsc_pipeline/main.py b/rdr_service/ppsc_pipeline/main.py index 427c504bc7..e9a4dd3fe1 100644 --- a/rdr_service/ppsc_pipeline/main.py +++ b/rdr_service/ppsc_pipeline/main.py @@ -14,7 +14,8 @@ from rdr_service.services.flask import PPSC_PIPELINE_PREFIX, flask_start, flask_stop from rdr_service.services.gcp_logging import begin_request_logging, end_request_logging,\ flask_restful_log_exception_error -from rdr_service.workflow_management.ppsc.ppsc_data_transfer_input_feed import InputFeed, Intake2SummaryFeed +from rdr_service.workflow_management.ppsc.ppsc_data_transfer_input_feed import InputFeed, Intake2SummaryFeed, \ + AwardeeInSiteFeed @app_util.auth_required_scheduler @@ -27,6 +28,12 @@ def test_job(): return '{"success": "true"}' +@app_util.auth_required_scheduler +def awardee_insite_input_feed(): + datafeed = request.get_json().get("datafeed") + input_feed = AwardeeInSiteFeed(project=GAE_PROJECT) + input_feed.run_datafeed(datafeed) + return '{ "success": "true" }' @app_util.auth_required_scheduler def ppsc_data_transfer_input_feed(): @@ -97,6 +104,13 @@ def _build_pipeline_app(): ) # Cloud Scheduler - Scheduler jobs + ppsc_pipeline.add_url_rule( + PPSC_PIPELINE_PREFIX + "AwardeeInSiteInputFeed", + endpoint="awardee_insite_input_feed", + view_func=awardee_insite_input_feed, + methods=["GET", "POST"] + ) + ppsc_pipeline.add_url_rule( PPSC_PIPELINE_PREFIX + "TransferInputFeed", endpoint="ppsc_data_transfer_input_feed", diff --git a/rdr_service/workflow_management/ppsc/data_feed_queries.py b/rdr_service/workflow_management/ppsc/data_feed_queries.py index b7aaa643f9..72be54d583 100644 --- a/rdr_service/workflow_management/ppsc/data_feed_queries.py +++ b/rdr_service/workflow_management/ppsc/data_feed_queries.py @@ -1,5 +1,6 @@ from rdr_service import config from rdr_service.config import GAE_PROJECT +from rdr_service.model.awardee_insite import AwardeeInSite def insert_core_data(project: str, src_operational_dataset: str, destination_dataset: str) -> str: @@ -271,3 +272,491 @@ def get_health_data_to_stream(project: str, destination_dataset: str) -> str: AND t.event_date_time = s.event_date_time ) ;""" + + +def get_awardee_insite_data_to_stream(project: str, destination_dataset: str) -> str: + """Get data for Awardee InSite to stream to MySQL. The SQL will return new + records or if an existing column value changed. + """ + return f""" + WITH awardee_insite_with_surrogate_key AS ( + SELECT participant_id + , {AwardeeInSite.create_surrogate_key_sql()} AS surrogate_key + FROM `{project}.{destination_dataset}.ppsc_awardee_insite` + ), + most_recent_datafeed_records AS ( + SELECT * + FROM ( + SELECT * + , ROW_NUMBER() OVER (PARTITION BY participant_id ORDER BY created DESC) AS rn + FROM `rdr_operational_datastream.datafeed_input_awardee_insite` + ) + WHERE rn = 1 + ) + SELECT * EXCEPT (surrogate_key, created, rn) + FROM most_recent_datafeed_records mrdr + WHERE NOT EXISTS ( + SELECT 1 + FROM awardee_insite_with_surrogate_key ai + WHERE mrdr.participant_id = ai.participant_id + AND mrdr.surrogate_key = ai.surrogate_key + ) + """ + + +def insert_awardee_insite_data( + project: str, src_operational_dataset: str, destination_dataset: str +) -> str: + """Insert data into `datafeed_input_awardee_insite` table. Also takes care of withdrawn participants""" + + return f""" + INSERT INTO `{project}.{destination_dataset}.datafeed_input_awardee_insite` + ( + surrogate_key + , created + , participant_id + , first_name + , middle_name + , last_name + , zip_code + , state + , city + , street_address + , street_address2 + , phone_number + , email + , date_of_birth + , organization + , withdrawal_status + , withdrawal_time + , deactivation_status + , deactivation_time + , deceased_status + , deceased_authored + , consent_for_electronic_health_records + , consent_for_electronic_health_records_authored + , consent_for_electronic_health_records_first_yes_authored + , first_ehr_receipt_time + , latest_ehr_receipt_time + , consent_for_study_enrollment + , consent_for_study_enrollment_authored + , enrollment_status + , clinic_physical_measurements_status + , clinic_physical_measurements_finalized_time + , clinic_physical_measurements_finalized_site + , self_reported_physical_measurements_status + , self_reported_physical_measurements_authored + , patient_status + , biospecimen_source_site + , biospecimen_order_time + , biospecimen_status + , sample_1sal2_collection_method + , sample_status_1sal2 + , sample_order_status_1sal2 + , sample_order_status_1sal2_time + ) + WITH + participant_cte AS ( + SELECT id AS participant_id + FROM `{project}.{src_operational_dataset}.ppsc_participant` + WHERE ignore_flag = 0 + ), + profile_pivot AS ( + SELECT participant_id + , piiname_first AS first_name + , piiname_middle AS middle_name + , piiname_last AS last_name + , streetaddress_piizip AS zip_code + , streetaddress_piistate AS state + , streetaddress_piicity AS city + , piiaddress_streetaddress AS street_address + , piiaddress_streetaddress2 AS street_address2 + , piicontactinformation_phone AS phone_number + , piicontactinformation_email AS email + , piibirthinformation_birthdate AS date_of_birth + FROM + ( + SELECT participant_id + , data_element_name + , data_element_value + FROM `{project}.{src_operational_dataset}.ppsc_profile_updates_event` + WHERE ignore_flag = 0 + ) + PIVOT(ANY_VALUE(data_element_value) + FOR data_element_name IN + ('piiname_first' + , 'piiname_middle' + , 'piiname_last' + , 'streetaddress_piizip' + , 'streetaddress_piistate' + , 'streetaddress_piicity' + , 'piiaddress_streetaddress' + , 'piiaddress_streetaddress2' + , 'piicontactinformation_phone' + , 'piicontactinformation_email' + , 'piibirthinformation_birthdate' + ) + ) + ), + organization_cte AS ( + SELECT participant_id + , event_id + , MAX(event_authored_time) AS activity_date_time + , MAX(CASE WHEN REPLACE(data_element_name, '\u200B', '') = 'activity_status' THEN data_element_value END) AS activity_status + FROM `{project}.{src_operational_dataset}.ppsc_attribution_event` + WHERE event_type_name = 'Org Attribution' AND ignore_flag = 0 + GROUP BY 1, 2 + ), + latest_organization AS ( + SELECT participant_id + , activity_status AS organization + FROM ( + SELECT participant_id + , activity_status + , activity_date_time + , ROW_NUMBER() OVER(PARTITION BY participant_id ORDER BY activity_date_time DESC) AS rn + FROM organization_cte + ) + WHERE rn = 1 + ), + withdrawn_cte AS ( + SELECT participant_id + , event_id + , MAX(event_authored_time) AS activity_date_time + , MAX(CASE WHEN REPLACE(data_element_name, '\u200B', '') = 'activity_status' THEN data_element_value END) AS activity_status + FROM `{project}.{src_operational_dataset}.ppsc_withdrawal_event` + WHERE LOWER(event_type_name) = 'withdrawal' AND ignore_flag = 0 + GROUP BY 1, 2 + ), + latest_withdrawn AS ( + SELECT participant_id + , activity_status AS withdrawal_status + , activity_date_time AS withdrawal_time + FROM ( + SELECT participant_id + , activity_status + , activity_date_time + , ROW_NUMBER() OVER(PARTITION BY participant_id ORDER BY activity_date_time DESC) AS rn + FROM withdrawn_cte + ) + WHERE rn = 1 + ), + deactivation_cte AS ( + SELECT participant_id + , event_id + , MAX(event_authored_time) AS activity_date_time + , MAX(CASE WHEN REPLACE(data_element_name, '\u200B', '') = 'activity_status' THEN data_element_value END) AS activity_status + FROM `{project}.{src_operational_dataset}.ppsc_deactivation_event` + WHERE LOWER(event_type_name) = 'deactivation' AND ignore_flag = 0 + GROUP BY 1, 2 + ), + latest_deactivation AS ( + SELECT participant_id + , activity_status AS deactivation_status + , activity_date_time AS deactivation_time + FROM ( + SELECT participant_id + , activity_status + , activity_date_time + , ROW_NUMBER() OVER(PARTITION BY participant_id ORDER BY activity_date_time DESC) AS rn + FROM deactivation_cte + ) + WHERE rn = 1 + ), + deceased_cte AS ( + SELECT participant_id + , event_id + , MAX(event_authored_time) AS activity_date_time + , MAX(CASE WHEN REPLACE(data_element_name, '\u200B', '') = 'activity_status' THEN data_element_value END) AS activity_status + FROM `{project}.{src_operational_dataset}.ppsc_participant_status_event` + WHERE LOWER(event_type_name) = 'death' AND ignore_flag = 0 + GROUP BY 1, 2 + ), + latest_deceased AS ( + SELECT participant_id + , activity_status AS deceased_status + , activity_date_time AS deceased_authored + FROM ( + SELECT participant_id + , activity_status + , activity_date_time + , ROW_NUMBER() OVER(PARTITION BY participant_id ORDER BY activity_date_time DESC) AS rn + FROM deceased_cte + ) + WHERE rn = 1 + ), + ehr_cte AS ( + SELECT participant_id + , event_id + , MAX(event_authored_time) AS activity_date_time + , MAX(CASE WHEN REPLACE(data_element_name, '\u200B', '') = 'activity_status' THEN data_element_value END) AS activity_status + FROM `{project}.{src_operational_dataset}.ppsc_consent_event` + WHERE event_type_name ='EHR Authorization' AND ignore_flag = 0 + GROUP BY 1, 2 + ), + ehr_latest_submitted AS ( + SELECT participant_id + , activity_status AS consent_for_electronic_health_records + , activity_date_time AS consent_for_electronic_health_records_authored + FROM ( + SELECT participant_id + , activity_status + , activity_date_time + , ROW_NUMBER() OVER(PARTITION BY participant_id ORDER BY activity_date_time DESC) AS rn + FROM ehr_cte + ) + WHERE rn = 1 + ), + ehr_first_yes_submitted AS ( + SELECT participant_id + , MIN(activity_date_time) AS consent_for_electronic_health_records_first_yes_authored + FROM ehr_cte + WHERE activity_status = 'Yes' + GROUP BY 1 + ), + ehr_receipt AS ( + SELECT participant_id + , MIN(event_date_time) AS first_ehr_receipt_time + , MAX(event_date_time) AS latest_ehr_receipt_time + FROM `{project}.{destination_dataset}.datafeed_input_ehr` + GROUP BY participant_id + ), + primary_consent_cte AS ( + SELECT participant_id + , event_id + , MAX(event_authored_time) AS activity_date_time + , MAX(CASE WHEN REPLACE(data_element_name, '\u200B', '') = 'activity_status' THEN data_element_value END) AS activity_status + FROM `{project}.{src_operational_dataset}.ppsc_consent_event` + WHERE event_type_name ='Primary Consent' AND ignore_flag = 0 + GROUP BY 1, 2 + ), + primary_consent_latest_submitted AS ( + SELECT participant_id + , activity_status AS consent_for_study_enrollment + , activity_date_time AS consent_for_study_enrollment_authored + FROM ( + SELECT participant_id + , activity_status + , activity_date_time + , ROW_NUMBER() OVER(PARTITION BY participant_id ORDER BY activity_date_time DESC) AS rn + FROM primary_consent_cte + ) + WHERE rn = 1 + ), + enrollment_status_cte AS ( + SELECT * + FROM `{project}.{src_operational_dataset}.ppsc_participant_status_event` + WHERE LOWER(event_type_name) = 'enrollment status' AND ignore_flag = 0 + AND LOWER(data_element_name) IN + ('registered', 'participant', 'participant_ehr_consent', 'enrolled', 'pmb_eligible', 'core_minus_pm', 'core_participant') + ), + -- Get most recently received payload Enrollment Status event with a value of yes, but without a no after for that field name + enrollment_status_recent_yes_ranked AS ( + SELECT es1.participant_id + , es1.data_element_name + , ROW_NUMBER() OVER (PARTITION BY es1.participant_id ORDER BY es1.event_authored_time DESC) AS rn + FROM enrollment_status_cte es1 + LEFT JOIN enrollment_status_cte es2 + ON es1.participant_id = es2.participant_id + AND es1.data_element_name = es2.data_element_name + AND LOWER(es2.data_element_value) = 'no' + AND es1.event_authored_time < es2.event_authored_time + WHERE es2.participant_id IS NULL AND LOWER(es1.data_element_value) = 'yes' + ), + enrollment_status_recent_yes AS ( + SELECT participant_id + , data_element_name AS enrollment_status + FROM enrollment_status_recent_yes_ranked + WHERE rn = 1 + ), + participant_summary_cte AS ( + SELECT + participant_id + , clinic_physical_measurements_status + , clinic_physical_measurements_finalized_time + , s1.site_name AS clinic_physical_measurements_finalized_site + , self_reported_physical_measurements_status + , self_reported_physical_measurements_authored + , patient_status + , s2.site_name AS biospecimen_source_site + , biospecimen_order_time + , biospecimen_status + , sample_1sal2_collection_method + , sample_status_1sal2 + , sample_order_status_1sal2 + , sample_order_status_1sal2_time + FROM `{project}.{src_operational_dataset}.rdr_participant_summary` ps + LEFT JOIN `{project}.{src_operational_dataset}.rdr_site` s1 + ON ps.clinic_physical_measurements_finalized_site_id = s1.site_id + LEFT JOIN `{project}.{src_operational_dataset}.rdr_site` s2 + ON ps.biospecimen_source_site_id = s2.site_id + ), + default_filled_columns AS ( + SELECT + participant_id + , first_name + , middle_name + , last_name + , zip_code + , state + , city + , street_address + , street_address2 + , phone_number + , email + , date_of_birth + , organization + , COALESCE(withdrawal_status, 'not_withdrawn') AS withdrawal_status + , withdrawal_time + , COALESCE(deactivation_status, 'not_deactivated') AS deactivation_status + , deactivation_time + , COALESCE(deceased_status, 'unset') AS deceased_status + , deceased_authored + , COALESCE(consent_for_electronic_health_records, 'no') AS consent_for_electronic_health_records + , consent_for_electronic_health_records_authored + , consent_for_electronic_health_records_first_yes_authored + , first_ehr_receipt_time + , latest_ehr_receipt_time + , COALESCE(consent_for_study_enrollment, 'no') AS consent_for_study_enrollment + , consent_for_study_enrollment_authored + , enrollment_status + , CASE clinic_physical_measurements_status + WHEN 0 THEN 'unset' + WHEN 1 THEN 'completed' + WHEN 2 THEN 'cancelled' + ELSE 'unset' + END AS clinic_physical_measurements_status + , clinic_physical_measurements_finalized_time + , clinic_physical_measurements_finalized_site + , CASE self_reported_physical_measurements_status + WHEN 0 THEN 'unset' + WHEN 1 THEN 'completed' + ELSE 'unset' + END AS self_reported_physical_measurements_status + , self_reported_physical_measurements_authored + , COALESCE(patient_status, JSON_ARRAY()) AS patient_status + , biospecimen_source_site + , biospecimen_order_time + , CASE biospecimen_status + WHEN 0 THEN 'unset' + WHEN 1 THEN 'created' + WHEN 2 THEN 'collected' + WHEN 3 THEN 'processed' + WHEN 4 THEN 'finalized' + ELSE 'unset' + END AS biospecimen_status + , CASE sample_1sal2_collection_method + WHEN 0 THEN 'unset' + WHEN 1 THEN 'mail_kit' + WHEN 2 THEN 'on_site' + ELSE 'unset' + END AS sample_1sal2_collection_method + , CASE sample_status_1sal2 + WHEN 0 THEN 'unset' + WHEN 1 THEN 'received' + WHEN 10 THEN 'disposed' + WHEN 11 THEN 'consumed' + WHEN 12 THEN 'unknown' + WHEN 13 THEN 'sample_not_received' + WHEN 14 THEN 'sample_not_processed' + WHEN 15 THEN 'accessinging_error' + WHEN 16 THEN 'lab_accident' + WHEN 17 THEN 'qns_for_processing' + WHEN 18 THEN 'quality_issue' + ELSE 'unset' + END AS sample_status_1sal2 + , CASE sample_order_status_1sal2 + WHEN 0 THEN 'unset' + WHEN 1 THEN 'created' + WHEN 2 THEN 'collected' + WHEN 3 THEN 'processed' + WHEN 4 THEN 'finalized' + ELSE 'unset' + END AS sample_order_status_1sal2 + , sample_order_status_1sal2_time + FROM participant_cte + LEFT JOIN profile_pivot + USING (participant_id) + LEFT JOIN latest_organization + USING (participant_id) + LEFT JOIN latest_withdrawn + USING (participant_id) + LEFT JOIN latest_deactivation + USING (participant_id) + LEFT JOIN latest_deceased + USING (participant_id) + LEFT JOIN ehr_latest_submitted + USING (participant_id) + LEFT JOIN ehr_first_yes_submitted + USING (participant_id) + LEFT JOIN ehr_receipt + USING (participant_id) + LEFT JOIN primary_consent_latest_submitted + USING (participant_id) + LEFT JOIN enrollment_status_recent_yes + USING (participant_id) + LEFT JOIN participant_summary_cte + USING (participant_id) + ), + withdrawn_update AS ( + SELECT + participant_id, + first_name, + middle_name, + last_name, + IF(withdrawal_status = 'withdrawn', NULL, zip_code) AS zip_code, + IF(withdrawal_status = 'withdrawn', NULL, state) AS state, + IF(withdrawal_status = 'withdrawn', NULL, city) AS city, + IF(withdrawal_status = 'withdrawn', NULL, street_address) AS street_address, + IF(withdrawal_status = 'withdrawn', NULL, street_address2) AS street_address2, + IF(withdrawal_status = 'withdrawn', NULL, phone_number) AS phone_number, + IF(withdrawal_status = 'withdrawn', NULL, email) AS email, + date_of_birth, + organization, + withdrawal_status, + withdrawal_time, + IF(withdrawal_status = 'withdrawn', 'unset', deactivation_status) AS deactivation_status, + IF(withdrawal_status = 'withdrawn', NULL, deactivation_time) AS deactivation_time, + IF(withdrawal_status = 'withdrawn', 'unset', deceased_status) AS deceased_status, + IF(withdrawal_status = 'withdrawn', NULL, deceased_authored) AS deceased_authored, + consent_for_electronic_health_records, + consent_for_electronic_health_records_authored, + IF(withdrawal_status = 'withdrawn', NULL, consent_for_electronic_health_records_first_yes_authored) AS consent_for_electronic_health_records_first_yes_authored, + IF(withdrawal_status = 'withdrawn', NULL, first_ehr_receipt_time) AS first_ehr_receipt_time, + IF(withdrawal_status = 'withdrawn', NULL, latest_ehr_receipt_time) AS latest_ehr_receipt_time, + consent_for_study_enrollment, + consent_for_study_enrollment_authored, + enrollment_status, + IF(withdrawal_status = 'withdrawn', 'unset', clinic_physical_measurements_status) AS clinic_physical_measurements_status, + IF(withdrawal_status = 'withdrawn', NULL, clinic_physical_measurements_finalized_time) AS clinic_physical_measurements_finalized_time, + IF(withdrawal_status = 'withdrawn', NULL, clinic_physical_measurements_finalized_site) AS clinic_physical_measurements_finalized_site, + IF(withdrawal_status = 'withdrawn', 'unset', self_reported_physical_measurements_status) AS self_reported_physical_measurements_status, + IF(withdrawal_status = 'withdrawn', NULL, self_reported_physical_measurements_authored) AS self_reported_physical_measurements_authored, + IF(withdrawal_status = 'withdrawn', TO_JSON([]), patient_status) AS patient_status, + IF(withdrawal_status = 'withdrawn', NULL, biospecimen_source_site) AS biospecimen_source_site, + IF(withdrawal_status = 'withdrawn', NULL, biospecimen_order_time) AS biospecimen_order_time, + IF(withdrawal_status = 'withdrawn', 'unset', biospecimen_status) AS biospecimen_status, + IF(withdrawal_status = 'withdrawn', 'unset', sample_1sal2_collection_method) AS sample_1sal2_collection_method, + IF(withdrawal_status = 'withdrawn', 'unset', sample_status_1sal2) AS sample_status_1sal2, + IF(withdrawal_status = 'withdrawn', 'unset', sample_order_status_1sal2) AS sample_order_status_1sal2, + IF(withdrawal_status = 'withdrawn', NULL, sample_order_status_1sal2_time) AS sample_order_status_1sal2_time + FROM default_filled_columns + ), + -- creating surrogate key to detect changes + final_result_with_surrogate_key AS ( + SELECT + {AwardeeInSite.create_surrogate_key_sql()} AS surrogate_key + , CURRENT_TIMESTAMP() AS created + , * + FROM withdrawn_update + ) + + SELECT * + FROM final_result_with_surrogate_key fr + WHERE NOT EXISTS ( + SELECT 1 + FROM `{project}.{destination_dataset}.datafeed_input_awardee_insite` staging_data + WHERE staging_data.participant_id = fr.participant_id -- to detect new pids + AND staging_data.surrogate_key = fr.surrogate_key -- to detect updated records + ); + """ diff --git a/rdr_service/workflow_management/ppsc/ppsc_data_transfer_input_feed.py b/rdr_service/workflow_management/ppsc/ppsc_data_transfer_input_feed.py index 66d8550bb2..8580a462c5 100644 --- a/rdr_service/workflow_management/ppsc/ppsc_data_transfer_input_feed.py +++ b/rdr_service/workflow_management/ppsc/ppsc_data_transfer_input_feed.py @@ -1,3 +1,4 @@ +import ast import logging from abc import ABC, abstractmethod @@ -9,9 +10,10 @@ from rdr_service.dao.participant_dao import ParticipantDao from rdr_service.dao.participant_summary_dao import ParticipantSummaryDao from rdr_service.dao.ppsc_partner_transfer_dao import PPSCDataTransferBaseDao +from rdr_service.dao.awardee_insite_dao import AwardeeInSiteDao from rdr_service.model.participant_summary import ParticipantSummary +from rdr_service.model.awardee_insite import AwardeeInSite from rdr_service.model.ppsc_partner_data_transfer import PPSCCore, PPSCBiobankSample, PPSCHealthData, PPSCEHR -# from rdr_service.cloud_utils import bigquery from rdr_service.workflow_management.ppsc import data_feed_queries from rdr_service.workflow_management.ppsc.ppsc_intake_to_ps_queries import get_consent_activity_to_stream, \ get_profile_updates_activity_to_stream, get_withdrawal_activity_to_stream, get_deactivation_activity_to_stream, \ @@ -282,3 +284,66 @@ def run_datafeed(self, datafeed): else: logging.warning(f"No Staged Rows for {datafeed} Data Feed") + + +class AwardeeInSiteFeed(PPSCBigQueryDatafeedBase): + + def __init__(self, project='test'): + self.project = project + self.bq_client = bigquery.Client() + + def make_datafeed_job(self, job_def: str): + """Runs the query in BQ and returns the result.""" + return self.bq_client.query(job_def).result() + + def get_datafeed_definition(self) -> dict: + src = config.getSettingJson(config.PPSC_DATAFEED_SRC_DATASET)[0] + destination = config.getSettingJson(config.PPSC_DATAFEED_DEST_DATASET)[0] + + job_def = { + "staging_data_sql": data_feed_queries.insert_awardee_insite_data( + self.project, src, destination + ), + "streaming_data_sql": data_feed_queries.get_awardee_insite_data_to_stream( + self.project, destination + ), + "destination_model": AwardeeInSite, + } + + return job_def + + @staticmethod + def row_to_dict(row: bigquery.Row) -> dict: + row_dict = {} + for key in row.keys(): + row_dict[key] = row[key] + return row_dict + + def run_datafeed(self, datafeed: str) -> None: + + datafeed_def = self.get_datafeed_definition() + self.make_datafeed_job(datafeed_def["staging_data_sql"]) # Stage data rows + streaming_data_rows = self.make_datafeed_job(datafeed_def["streaming_data_sql"]) + + dao = AwardeeInSiteDao() + if streaming_data_rows: + for row in streaming_data_rows: + awardee_insite_dict = AwardeeInSiteFeed.row_to_dict(row) + camel_case_awardee_insite_dict = { + dao.snake_to_camel_case(key): val for key, val in awardee_insite_dict.items() + } + id_ = dao.get_id(AwardeeInSite(**camel_case_awardee_insite_dict)) + logging.info(f"""Streaming P{camel_case_awardee_insite_dict["participantId"]}""") + if id_: + # This allows to update an existing record in MySQL + camel_case_awardee_insite_dict["id"] = id_ + + # For some reason, patientStatus was turning into a string, so convert it back to list. + if isinstance(camel_case_awardee_insite_dict["patientStatus"], str): + camel_case_awardee_insite_dict["patientStatus"] = ast.literal_eval( + camel_case_awardee_insite_dict["patientStatus"] + ) + + dao.upsert(AwardeeInSite(**camel_case_awardee_insite_dict)) + else: + logging.warning(f"No rows to add to {datafeed} Data Feed") diff --git a/tests/api_tests/test_awardee_insite_api.py b/tests/api_tests/test_awardee_insite_api.py new file mode 100644 index 0000000000..0742576a83 --- /dev/null +++ b/tests/api_tests/test_awardee_insite_api.py @@ -0,0 +1,364 @@ +import http.client +from datetime import datetime +from mock import patch +from copy import deepcopy + +from tests.helpers.unittest_base import BaseTestCase + +from rdr_service import config +from rdr_service.clock import FakeClock +from rdr_service.dao.awardee_insite_dao import AwardeeInSiteDao +from rdr_service.api_util import HEALTHPRO, AWARDEE, RDR +from rdr_service.model.awardee_insite import AwardeeInSite + + +class AwardeeInSiteApiTest(BaseTestCase): + def setUp(self): + super().setUp() + self.awardee_insite_dao = AwardeeInSiteDao() + self.pitt_org_name = "PITT_BANNER_HEALTH" # hpo_name=PITT + self.az_org_name = "AZ_TUCSON_BANNER_HEALTH" # hpo_name=AZ_TUCSON + + self.awardee_insite_rows = [ + { + "participantId": 1234, + "firstName": "John", + "middleName": "Sam", + "lastName": "Doe", + "zipCode": "77490", + "state": "TX", + "city": "Houston", + "streetAddress": "123 Lake Dr", + "streetAddress2": "Apt 34", + "phoneNumber": "1234567890", + "email": "john@example.com", + "dateOfBirth": "1992-06-08", + "organization": self.pitt_org_name, + }, + { + "participantId": 2299, + "firstName": "Alex", + "middleName": None, + "lastName": "Smith", + "zipCode": "24354", + "state": "NY", + "city": "Albany", + "streetAddress": "123 Forrest Dr", + "streetAddress2": None, + "phoneNumber": "4327685938", + "email": "alex_smith@example.com", + "dateOfBirth": "1989-05-05", + "organization": self.pitt_org_name, + }, + { + "participantId": 3000, + "firstName": "Meed", + "middleName": None, + "lastName": "Jade", + "zipCode": "36509", + "state": "TN", + "city": "Nashville", + "streetAddress": "9832 Albany Dr", + "streetAddress2": "Apt 5", + "phoneNumber": "9874567653", + "email": "meed@example.com", + "dateOfBirth": "1984-05-08", + "organization": self.pitt_org_name, + }, + { + "participantId": 4866, + "firstName": "Jack", + "middleName": None, + "lastName": "Matt", + "zipCode": "45490", + "state": "LA", + "city": "Lafayette", + "streetAddress": "184 Knox Ln", + "streetAddress2": "Apt 67", + "phoneNumber": "9843685667", + "email": "jack_ma@example.com", + "dateOfBirth": "1990-12-18", + "organization": self.az_org_name, + }, + { + "participantId": 5450, + "firstName": "Ali", + "middleName": None, + "lastName": "Mo", + "zipCode": "56491", + "state": "Illinois", + "city": "Chicago", + "streetAddress": "50 Cart Ln", + "streetAddress2": None, + "phoneNumber": "7563974610", + "email": "alimo@example.com", + "dateOfBirth": "1970-02-09", + "organization": self.pitt_org_name, + }, + ] + # Insert records in awardee_insite table + for record in self.awardee_insite_rows: + self.awardee_insite_dao.insert(AwardeeInSite(**record)) + + # Get pids for org in a list + self.pitt_org_pids = [ + record["participantId"] + for record in self.awardee_insite_rows + if record["organization"] == self.pitt_org_name + ] + self.az_org_pids = [ + record["participantId"] + for record in self.awardee_insite_rows + if record["organization"] == self.az_org_name + ] + + def overwrite_test_user_awardee( + self, roles: list, awardee: str | None = None + ) -> None: + new_user_info = deepcopy(config.getSettingJson(config.USER_INFO)) + new_user_info["example@example.com"]["roles"] = roles + if awardee: # Add awardee key only for Awardees, and not for rdr + new_user_info["example@example.com"]["awardee"] = awardee + self.temporarily_override_config_setting(config.USER_INFO, new_user_info) + + def test_awardee_insite_caller_roles(self): + """Make sure only RDR and AWARDEE roles can call the API""" + self.overwrite_test_user_awardee(roles=[RDR]) + response = self.send_get("AwardeeInSite?awardee=PITT") + self.assertTrue(response is not None) + + self.overwrite_test_user_awardee(roles=[AWARDEE], awardee="PITT") + response = self.send_get("AwardeeInSite") + self.assertTrue(response is not None) + + self.overwrite_test_user_awardee([HEALTHPRO]) + response = self.send_get("AwardeeInSite", expected_status=http.client.FORBIDDEN) + self.assertTrue(response.status_code == 403) + + def test_rdr_requires_awardee_parameter(self): + """RDR must pass awardee parameter to call the API""" + self.overwrite_test_user_awardee(roles=[RDR]) + response = self.send_get("AwardeeInSite?awardee=PITT") + results = response.get("entry") + results_pid = [ + int(ele["resource"]["participantId"].replace("P", "")) for ele in results + ] + self.assertTrue(response is not None) + self.assertEqual(len(results), len(self.pitt_org_pids)) + self.assertEqual(results_pid, self.pitt_org_pids) + + # Not passing in an awardee query param to the endpoint + response = self.send_get( + "AwardeeInSite", expected_status=http.client.BAD_REQUEST + ) + self.assertTrue(response.status_code == 400) + + def test_response_values(self): + self.overwrite_test_user_awardee(["awardee_sa"], "AZ_TUCSON") + + pid_for_az_org = 4866 # defined in the setUp method + with self.awardee_insite_dao.session() as session: + id_ = ( + session.query(AwardeeInSite.id) + .filter(AwardeeInSite.participantId == pid_for_az_org) + .first() + ) + + awardee_insite_values = { + "id": id_, + "deactivationStatus": "deactivated", + "deactivationTime": "2024-11-21T18:12:00", + "consentForElectronicHealthRecords": "no", + "consentForElectronicHealthRecordsAuthored": "2024-11-21T18:12:00", + "firstEhrReceiptTime": "2024-11-25T18:12:00", + "latestEhrReceiptTime": "2024-11-26T18:12:00", + "consentForStudyEnrollment": "yes", + "consentForStudyEnrollmentAuthored": "2024-11-21T18:12:00", + "patientStatus": [], + "enrollmentStatus": "registered", + } + self.awardee_insite_dao.upsert(AwardeeInSite(**awardee_insite_values)) + + expected_result = { + "participantId": "P4866", + "firstName": "Jack", + "middleName": "UNSET", + "lastName": "Matt", + "zipCode": "45490", + "streetAddress": "184 Knox Ln", + "streetAddress2": "Apt 67", + "phoneNumber": "9843685667", + "dateOfBirth": "1990-12-18", + "withdrawalStatus": "not_withdrawn", + "withdrawalTime": "UNSET", + "deactivationStatus": "deactivated", + "deactivationTime": "2024-11-21T18:12:00", + "deceasedStatus": "UNSET", + "deceasedAuthored": "UNSET", + "clinicPhysicalMeasurementsStatus": "UNSET", + "clinicPhysicalMeasurementsFinalizedTime": "UNSET", + "clinicPhysicalMeasurementsFinalizedSite": "UNSET", + "selfReportedPhysicalMeasurementsStatus": "UNSET", + "selfReportedPhysicalMeasurementsAuthored": "UNSET", + "consentForElectronicHealthRecords": "no", + "consentForElectronicHealthRecordsAuthored": "2024-11-21T18:12:00", + "consentForElectronicHealthRecordsFirstYesAuthored": "UNSET", + "firstEhrReceiptTime": "2024-11-25T18:12:00", + "latestEhrReceiptTime": "2024-11-26T18:12:00", + "consentForStudyEnrollment": "yes", + "consentForStudyEnrollmentAuthored": "2024-11-21T18:12:00", + "patientStatus": [], + "enrollmentStatus": "registered", + "biospecimenSourceSite": "UNSET", + "biospecimenOrderTime": "UNSET", + "biospecimenStatus": "UNSET", + "sample1SAL2CollectionMethod": "UNSET", + "sampleStatus1SAL2": "UNSET", + "sampleOrderStatus1SAL2": "UNSET", + "sampleOrderStatus1SAL2Time": "UNSET", + "state": "LA", + "city": "Lafayette", + "email": "jack_ma@example.com", + "organization": "AZ_TUCSON_BANNER_HEALTH", + } + + response = self.send_get("AwardeeInSite") + result = response.get("entry")[0]["resource"] + + self.assertEqual(result, expected_result) + + + @patch( + "rdr_service.api.awardee_insite_api.AWARDEE_INSITE_PAGINATION_MAX_RESULTS", 10 + ) + def test_all_participants_for_awardee_are_returned(self): + self.overwrite_test_user_awardee(["awardee_sa"], "PITT") + + response = self.send_get("AwardeeInSite") + results = response.get("entry") + results_pid = [ + int(ele["resource"]["participantId"].replace("P", "")) for ele in results + ] + + self.assertTrue(response is not None) + self.assertEqual(len(results), len(self.pitt_org_pids)) + self.assertEqual(results_pid, self.pitt_org_pids) + + # Test it for another Awardee + self.overwrite_test_user_awardee(["awardee_sa"], "AZ_TUCSON") + response = self.send_get("AwardeeInSite") + results = response.get("entry") + results_pid = [ + int(ele["resource"]["participantId"].replace("P", "")) for ele in results + ] + + self.assertTrue(response is not None) + self.assertEqual(len(results), len(self.az_org_pids)) + self.assertEqual(results_pid, self.az_org_pids) + + @patch( + "rdr_service.api.awardee_insite_api.AWARDEE_INSITE_PAGINATION_MAX_RESULTS", 3 + ) + def test_pagination_with_max_results(self): + self.overwrite_test_user_awardee(["awardee_sa"], "PITT") + page_size = 3 # Make sure this matches patched value in the patch decorator + + response_page_1 = self.send_get("AwardeeInSite") + next_url = response_page_1["link"][0]["url"] + results = response_page_1.get("entry") + results_pid = [ + int(ele["resource"]["participantId"].replace("P", "")) for ele in results + ] + + self.assertTrue(response_page_1 is not None) + self.assertEqual(len(results_pid), len(self.pitt_org_pids[:page_size])) + self.assertEqual(results_pid, self.pitt_org_pids[:page_size]) + self.assertTrue( + next_url is not None, "Next URL should exist if there's a next page" + ) + + # Get the 2nd page + token = next_url.split("token=")[-1] + url_with_token = f"AwardeeInSite?_token={token}" + response_page_2 = self.send_get(url_with_token) + response_2_url = response_page_2.get("link") + results = response_page_2.get("entry") + results_pid = [ + int(ele["resource"]["participantId"].replace("P", "")) for ele in results + ] + + self.assertTrue(response_page_2 is not None) + self.assertIsNone(response_2_url, "Next URL should not exist in the last page") + self.assertEqual( + len(results_pid), len(self.pitt_org_pids[page_size : page_size + page_size]) + ) + self.assertEqual( + results_pid, self.pitt_org_pids[page_size : page_size + page_size] + ) + + @patch( + "rdr_service.api.awardee_insite_api.AWARDEE_INSITE_PAGINATION_MAX_RESULTS", 3 + ) + def test_pagination_with_includeTotal_parameter(self): + self.overwrite_test_user_awardee(["awardee_sa"], "PITT") + + response_page_1 = self.send_get("AwardeeInSite?_includeTotal=True") + next_url = response_page_1["link"][0]["url"] + total = response_page_1.get("total") + + self.assertTrue(total is not None, "response should contain total") + self.assertEqual(len(self.pitt_org_pids), total) + + # Get the 2nd page + token = next_url.split("token=")[-1] + url_with_token = f"AwardeeInSite?_token={token}&_includeTotal=True" + response_page_2 = self.send_get(url_with_token) + total = response_page_2.get("total") + + self.assertTrue(total is not None, "response should contain total") + self.assertEqual(len(self.pitt_org_pids), total) + + @patch( + "rdr_service.api.awardee_insite_api.AWARDEE_INSITE_PAGINATION_MAX_RESULTS", 10 + ) + def test_updated_since_parameter(self): + """ + The API should have the ability to only return a set of + participant's that have been modified since a specified date + """ + self.overwrite_test_user_awardee(["awardee_sa"], "PITT") + + # This record should not be returned + pid = 8000 + records = [ + { + "participantId": pid, + "firstName": "Erling", + "lastName": "Roe", + "organization": self.pitt_org_name, + }, + ] + + with FakeClock(datetime(2024, 1, 1)): + for record in records: + self.awardee_insite_dao.insert(AwardeeInSite(**record)) + + response = self.send_get("AwardeeInSite?updatedSince=2024-05-01") + results = response.get("entry") + results_pid = [ + int(ele["resource"]["participantId"].replace("P", "")) for ele in results + ] + + self.assertNotIn(pid, results_pid) + + def test_hpo_without_any_participant_returns_none(self): + awardee = "TEST-1" # Not linked to any existing participants + self.overwrite_test_user_awardee(["awardee_sa"], awardee) + + response = self.send_get("AwardeeInSite") + self.assertTrue(response is not None) + self.assertTrue(len(response["entry"]) == 0) + + def tearDown(self): + super().tearDown() + self.clear_table_after_test("ppsc.awardee_insite") diff --git a/tests/workflow_tests/test_ppsc_data_transfer_input.py b/tests/workflow_tests/test_ppsc_data_transfer_input.py index e6ae40bc28..fbd0d5988f 100644 --- a/tests/workflow_tests/test_ppsc_data_transfer_input.py +++ b/tests/workflow_tests/test_ppsc_data_transfer_input.py @@ -6,9 +6,11 @@ from rdr_service.dao.participant_dao import ParticipantDao from rdr_service.dao.participant_summary_dao import ParticipantSummaryDao from rdr_service.dao.ppsc_partner_transfer_dao import PPSCDataTransferBaseDao +from rdr_service.dao.awardee_insite_dao import AwardeeInSiteDao from rdr_service.data_gen.generators.ppsc import PPSCDataGenerator from rdr_service.model.participant_summary import ParticipantSummary from rdr_service.model.ppsc_partner_data_transfer import PPSCEHR +from rdr_service.model.awardee_insite import AwardeeInSite from rdr_service.participant_enums import QuestionnaireStatus, WithdrawalStatus, WithdrawalReason, SuspensionStatus, \ DeceasedStatus, RetentionStatus, RetentionType, GenderIdentity, Race from rdr_service.workflow_management.ppsc.ppsc_to_legacy_de_mappings import map_source_to_summary, \ @@ -21,7 +23,8 @@ profile_updates_activity_expected_sql, withdrawal_activity_expected_sql, deactivation_activity_expected_sql, \ participant_status_activity_expected_sql, survey_completion_activity_expected_sql, \ attribution_activity_expected_sql, insert_sent_records_expected_sql -from rdr_service.workflow_management.ppsc.ppsc_data_transfer_input_feed import InputFeed, Intake2SummaryFeed +from rdr_service.workflow_management.ppsc.ppsc_data_transfer_input_feed import InputFeed, Intake2SummaryFeed, \ + AwardeeInSiteFeed class DataTransferInputTest(GenomicDataGenMixin): @@ -172,6 +175,105 @@ def test_run_datafeed(self, mock_make_datafeed_job, mock_bq_client): self.assertEqual(actual_rows[0].participant_id, mock_streaming_data_rows[0]['participant_id']) +class AwardeeInSiteDataFeedTest(GenomicDataGenMixin): + def setUp(self, *args, **kwargs): + # pylint: disable=unused-argument + super().setUp() + + @mock.patch("rdr_service.workflow_management.ppsc.ppsc_data_transfer_input_feed.AwardeeInSiteFeed.row_to_dict") + @mock.patch("google.cloud.bigquery.Client") + def test_run_datafeed(self, mock_bq_client, mock_row_to_dict): + + record = [{ + "participantId": 12345, + "firstName": "John", + "middleName": "Sam", + "lastName": "Doe", + "zipCode": "77454", + "state": "TX", + "city": "Houston", + "streetAddress": "123 Lake Dr", + "streetAddress2": "Apt 34", + "phoneNumber": "1234567890", + "email": "john@example.com", + "dateOfBirth": "1992-06-08", + "organization": "PA", + "withdrawalStatus": "not_withdrawn", + "withdrawalTime": "2024-11-21T18:12:00", + "deactivationStatus": "not_deactivated", + "deactivationTime": "2024-11-21T18:12:00", + "deceasedStatus": "unset", + "deceasedAuthored": "2024-11-21T18:12:00", + "consentForElectronicHealthRecords": "yes", + "consentForElectronicHealthRecordsAuthored": "2024-11-21T18:12:00", + "firstEhrReceiptTime": "2024-11-25T18:12:00", + "latestEhrReceiptTime": "2024-11-26T18:12:00", + "consentForStudyEnrollment": "no", + "patientStatus": [] + }] + + mock_bq_instance = mock_bq_client.return_value + mock_bq_instance.query.return_value.result.return_value = record + mock_row_to_dict.return_value = record[0] + + # Test if its correctly inserted + awardee_insite_feed = AwardeeInSiteFeed() + awardee_insite_feed.get_datafeed_definition = mock.Mock(return_value={ + "staging_data_sql": "staging_sql", + "streaming_data_sql": "streaming_sql", + "destination_model": AwardeeInSite + }) + + awardee_insite_feed.run_datafeed("awardee_insite") + + awardee_insite_dao = AwardeeInSiteDao() + actual_rows = awardee_insite_dao.get_all() + + self.assertEqual(actual_rows[0].participantId, 12345) + self.assertEqual(actual_rows[0].firstName, "John") + self.assertEqual(actual_rows[0].middleName, "Sam") + self.assertEqual(actual_rows[0].lastName, "Doe") + self.assertEqual(actual_rows[0].zipCode, "77454") + self.assertEqual(actual_rows[0].state, "TX") + self.assertEqual(actual_rows[0].city, "Houston") + self.assertEqual(actual_rows[0].streetAddress, "123 Lake Dr") + self.assertEqual(actual_rows[0].streetAddress2, "Apt 34") + self.assertEqual(actual_rows[0].phoneNumber, "1234567890") + self.assertEqual(actual_rows[0].email, "john@example.com") + self.assertEqual(actual_rows[0].dateOfBirth, datetime.date(1992, 6, 8)) + self.assertEqual(actual_rows[0].organization, "PA") + self.assertEqual(actual_rows[0].withdrawalStatus, "not_withdrawn") + self.assertEqual(actual_rows[0].withdrawalTime, datetime.datetime(2024, 11, 21, 18, 12)) + self.assertEqual(actual_rows[0].deactivationStatus, "not_deactivated") + self.assertEqual(actual_rows[0].deactivationTime, datetime.datetime(2024, 11, 21, 18, 12)) + self.assertEqual(actual_rows[0].consentForElectronicHealthRecords, "yes") + self.assertEqual(actual_rows[0].consentForElectronicHealthRecordsAuthored, datetime.datetime(2024, 11, 21, 18, 12)) + self.assertEqual(actual_rows[0].consentForStudyEnrollment,"no") + self.assertEqual(actual_rows[0].deceasedStatus, "unset") + self.assertEqual(actual_rows[0].deceasedAuthored, datetime.datetime(2024, 11, 21, 18, 12)) + self.assertEqual(actual_rows[0].patientStatus, []) + + #################################################### + # Test updating the middle name of an existing record + updated_record = [{ + "participantId": 12345, + "firstName": "John", + "middleName": "Samuel", + "state": "NY", + "patientStatus": [] + }] + + mock_bq_instance.query.return_value.result.return_value = updated_record + mock_row_to_dict.return_value = updated_record[0] + + awardee_insite_feed.run_datafeed("awardee_insite") + + updated_rows = awardee_insite_dao.get_all() + + self.assertEqual(updated_rows[0].participantId, 12345) + self.assertEqual(updated_rows[0].middleName, "Samuel") + + class Intake2SummaryDataFeedTest(GenomicDataGenMixin): def setUp(self, *args, **kwargs) -> None: # pylint: disable=unused-argument