Skip to content

Commit

Permalink
fix: subscription take (#538)
Browse files Browse the repository at this point in the history
* Unnecessary columns removed and topic name NS added to rmw_take

Signed-off-by: ISP akm <[email protected]>

* change style

Signed-off-by: ISP akm <[email protected]>

* Fix pytest

Signed-off-by: ISP akm <[email protected]>

* fix pytest

Signed-off-by: ISP akm <[email protected]>

* fix pytest

Signed-off-by: ISP akm <[email protected]>

* rmw_take trace points are not drawn

Signed-off-by: ISP akm <[email protected]>

* ci(pre-commit): autofix

* Delete unnecessary comments

Signed-off-by: ISP akm <[email protected]>

* Clarify checks when the end of the path is sub_take and include_last_callback, and skip the merge process.

Signed-off-by: ISP akm <[email protected]>

* correct by review

Signed-off-by: ISP akm <[email protected]>

* correct

Signed-off-by: ISP akm <[email protected]>

* correct

Signed-off-by: ISP akm <[email protected]>

* correct

Signed-off-by: ISP akm <[email protected]>

* correct indent

Signed-off-by: ISP akm <[email protected]>

---------

Signed-off-by: ISP akm <[email protected]>
Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com>
  • Loading branch information
xygyo77 and pre-commit-ci[bot] authored Oct 30, 2024
1 parent 42d9cce commit c4cd270
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 78 deletions.
10 changes: 9 additions & 1 deletion src/caret_analyze/infra/lttng/records_provider_lttng.py
Original file line number Diff line number Diff line change
Expand Up @@ -922,7 +922,11 @@ def _compose_inter_proc_comm_records(
if COLUMN_NAME.DDS_WRITE_TIMESTAMP in records.columns:
columns.append(COLUMN_NAME.DDS_WRITE_TIMESTAMP)
columns.append(COLUMN_NAME.SOURCE_TIMESTAMP)
columns.append(COLUMN_NAME.CALLBACK_START_TIMESTAMP)

sub_records = self._source.sub_records(callback_object, None)
is_take_node = len(sub_records) == 0
if not is_take_node:
columns.append(COLUMN_NAME.CALLBACK_START_TIMESTAMP)

self._format(records, columns)

Expand Down Expand Up @@ -982,6 +986,10 @@ def _rename_column(
rename_dict[COLUMN_NAME.SOURCE_TIMESTAMP] = \
f'{topic_name}/{COLUMN_NAME.SOURCE_TIMESTAMP}'

if COLUMN_NAME.RMW_TAKE_TIMESTAMP in records.columns:
rename_dict[COLUMN_NAME.RMW_TAKE_TIMESTAMP] =\
f'{topic_name}/{COLUMN_NAME.RMW_TAKE_TIMESTAMP}'

if COLUMN_NAME.TILDE_SUBSCRIBE_TIMESTAMP in records.columns:
rename_dict[COLUMN_NAME.TILDE_SUBSCRIBE_TIMESTAMP] = \
f'{topic_name}/{COLUMN_NAME.TILDE_SUBSCRIBE_TIMESTAMP}'
Expand Down
61 changes: 35 additions & 26 deletions src/caret_analyze/runtime/path.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,11 @@ def _merge_records(
) -> RecordsInterface:
logger.info('Started merging path records.')

def is_source_timestamp_column(column: str) -> bool:
def is_match_column(column: str, target_name: str) -> bool:
last_slash_index = column.rfind('/')
if last_slash_index >= 0:
column = column[:last_slash_index]
return column.endswith('source_timestamp')
return column.endswith(target_name)

column_merger = ColumnMerger()
if include_first_callback and isinstance(targets[0], NodePath):
Expand Down Expand Up @@ -163,8 +163,6 @@ def is_source_timestamp_column(column: str) -> bool:
right_records)
right_records.rename_columns(rename_rule)

if is_source_timestamp_column(right_records.columns[0]):
left_records.drop_columns([left_records.columns[-1]])
if left_records.columns[-1] != right_records.columns[0]:
raise InvalidRecordsError('left columns[-1] != right columns[0]')

Expand Down Expand Up @@ -210,37 +208,48 @@ def is_source_timestamp_column(column: str) -> bool:
)

if include_last_callback and isinstance(targets[-1], NodePath):
right_records = targets[-1].to_path_end_records()

rename_rule = column_merger.append_columns_and_return_rename_rule(right_records)
right_records.rename_columns(rename_rule)
if left_records.columns[-1] != right_records.columns[0]:
raise InvalidRecordsError('left columns[-1] != right columns[0]')
if len(right_records.data) != 0:
left_records = merge(
left_records=left_records,
right_records=right_records,
join_left_key=left_records.columns[-1],
join_right_key=right_records.columns[0],
columns=Columns.from_str(
left_records.columns + right_records.columns
).column_names,
how='left'
)
if not is_match_column(left_records.columns[-1], 'source_timestamp'):
right_records = targets[-1].to_path_end_records()

rename_rule = column_merger.append_columns_and_return_rename_rule(right_records)
right_records.rename_columns(rename_rule)
if left_records.columns[-1] != right_records.columns[0]:
raise InvalidRecordsError('left columns[-1] != right columns[0]')
if len(right_records.data) != 0:
left_records = merge(
left_records=left_records,
right_records=right_records,
join_left_key=left_records.columns[-1],
join_right_key=right_records.columns[0],
columns=Columns.from_str(
left_records.columns + right_records.columns
).column_names,
how='left'
)
else:
msg = 'Empty records are not merged.'
logger.warn(msg)
else:
msg = 'include_last_callback argument is ignored '
msg += 'because last node receive messages '
msg += 'by `take` method instead of subscription callback.'
msg = 'Since the path cannot be extended, '
msg += 'the merge process for the last callback record is skipped.'
logger.warn(msg)

logger.info('Finished merging path records.')
left_records.sort(first_column)

# search drop columns, which contain 'source_timestamp'
source_columns = \
[column for column in left_records.columns if is_source_timestamp_column(column)]
source_columns = [
column for column in left_records.columns
if is_match_column(column, 'source_timestamp')
]
left_records.drop_columns(source_columns)

rmw_take_column = [
column for column in left_records.columns
if is_match_column(column, 'rmw_take_timestamp')
]
left_records.drop_columns(rmw_take_column)

return left_records


Expand Down
1 change: 0 additions & 1 deletion src/test/infra/lttng/test_latency_definitions.py
Original file line number Diff line number Diff line change
Expand Up @@ -1381,7 +1381,6 @@ def test_inter_proc_empty_data(
columns=[
f'{communication.topic_name}/rclcpp_publish_timestamp',
f'{communication.topic_name}/source_timestamp',
f'{callback.callback_name}/callback_start_timestamp',
],
dtype='Int64'
)
Expand Down
97 changes: 47 additions & 50 deletions src/test/runtime/test_path.py
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ def test_take_impl_case(self, mocker):
ColumnValue(f'{topic0}/rcl_publish_timestamp'),
ColumnValue(f'{topic0}/dds_write_timestamp'),
ColumnValue(f'{topic0}/source_timestamp'),
ColumnValue(f'{topic0}/callback_start_timestamp'),
]
)
)
Expand Down Expand Up @@ -625,19 +624,15 @@ def test_take_impl_case(self, mocker):
def append_columns_and_return_rename_rule(records):
if merger_mock.append_columns_and_return_rename_rule.call_count == 1:
return {
f'{topic0}/rclcpp_publish_timestamp': (
f'{topic0}/rclcpp_publish_timestamp/0'
),
f'{topic0}/rcl_publish_timestamp': f'{topic0}/rcl_publish_timestamp/0',
f'{topic0}/dds_write_timestamp': f'{topic0}/dds_write_timestamp/0',
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
}
f'{topic0}/rclcpp_publish_timestamp': f'{topic0}/rclcpp_publish_timestamp/0',
f'{topic0}/rcl_publish_timestamp': f'{topic0}/rcl_publish_timestamp/0',
f'{topic0}/dds_write_timestamp': f'{topic0}/dds_write_timestamp/0',
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
}
if merger_mock.append_columns_and_return_rename_rule.call_count == 2:
return {
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
f'{topic1}/rclcpp_publish_timestamp': (
f'{topic1}/rclcpp_publish_timestamp/0'
),
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
f'{topic1}/rclcpp_publish_timestamp': f'{topic1}/rclcpp_publish_timestamp/0',
}
mocker.patch.object(
merger_mock, 'append_columns_and_return_rename_rule',
Expand Down Expand Up @@ -719,25 +714,17 @@ def test_take_impl_case_include_first_callback(self, mocker):
def append_columns_and_return_rename_rule(records):
if merger_mock.append_columns_and_return_rename_rule.call_count == 1:
return {
f'{topic0}/callback_start_timestamp': (
f'{topic0}/callback_start_timestamp/0'
),
f'{topic0}/rclcpp_publish_timestamp': (
f'{topic0}/rclcpp_publish_timestamp/0'
),
f'{topic0}/callback_start_timestamp': f'{topic0}/callback_start_timestamp/0',
f'{topic0}/rclcpp_publish_timestamp': f'{topic0}/rclcpp_publish_timestamp/0',
}
if merger_mock.append_columns_and_return_rename_rule.call_count == 2:
return {
f'{topic0}/rclcpp_publish_timestamp': (
f'{topic0}/rclcpp_publish_timestamp/0'
),
f'{topic0}/rcl_publish_timestamp': f'{topic0}/rcl_publish_timestamp/0',
f'{topic0}/dds_write_timestamp': f'{topic0}/dds_write_timestamp/0',
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
f'{topic1}/callback_start_timestamp': (
f'{topic1}/callback_start_timestamp/0'
),
}
f'{topic0}/rclcpp_publish_timestamp': f'{topic0}/rclcpp_publish_timestamp/0',
f'{topic0}/rcl_publish_timestamp': f'{topic0}/rcl_publish_timestamp/0',
f'{topic0}/dds_write_timestamp': f'{topic0}/dds_write_timestamp/0',
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
f'{topic1}/callback_start_timestamp': f'{topic1}/callback_start_timestamp/0',
}
mocker.patch.object(
merger_mock, 'append_columns_and_return_rename_rule',
side_effect=append_columns_and_return_rename_rule)
Expand Down Expand Up @@ -780,15 +767,13 @@ def test_take_impl_case_include_last_callback(self, mocker, caplog):
f'{topic0}/rcl_publish_timestamp': 2,
f'{topic0}/dds_write_timestamp': 3,
f'{topic0}/source_timestamp': 4,
f'{topic1}/callback_start_timestamp': 5
}),
],
[
ColumnValue(f'{topic0}/rclcpp_publish_timestamp'),
ColumnValue(f'{topic0}/rcl_publish_timestamp'),
ColumnValue(f'{topic0}/dds_write_timestamp'),
ColumnValue(f'{topic0}/source_timestamp'),
ColumnValue(f'{topic1}/callback_start_timestamp'),
]
)
)
Expand Down Expand Up @@ -816,29 +801,41 @@ def test_take_impl_case_include_last_callback(self, mocker, caplog):
def append_columns_and_return_rename_rule(records):
if merger_mock.append_columns_and_return_rename_rule.call_count == 1:
return {
f'{topic0}/rclcpp_publish_timestamp': (
f'{topic0}/rclcpp_publish_timestamp/0'
),
f'{topic0}/rcl_publish_timestamp': f'{topic0}/rcl_publish_timestamp/0',
f'{topic0}/dds_write_timestamp': f'{topic0}/dds_write_timestamp/0',
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
f'{topic1}/callback_start_timestamp': (
f'{topic1}/callback_start_timestamp/0'
),
}
f'{topic0}/rclcpp_publish_timestamp': f'{topic0}/rclcpp_publish_timestamp/0',
f'{topic0}/rcl_publish_timestamp': f'{topic0}/rcl_publish_timestamp/0',
f'{topic0}/dds_write_timestamp': f'{topic0}/dds_write_timestamp/0',
f'{topic0}/source_timestamp': f'{topic0}/source_timestamp/0',
}
if merger_mock.append_columns_and_return_rename_rule.call_count == 2:
return {
f'{topic1}/callback_start_timestamp': (
f'{topic1}/callback_start_timestamp/0'
),
f'{topic1}/callback_end_timestamp': f'{topic1}/callback_end_timestamp/0',
}
f'{topic1}/callback_start_timestamp': f'{topic1}/callback_start_timestamp/0',
f'{topic1}/callback_end_timestamp': f'{topic1}/callback_end_timestamp/0',
}
mocker.patch.object(
merger_mock, 'append_columns_and_return_rename_rule',
side_effect=append_columns_and_return_rename_rule)

with caplog.at_level(WARNING):
RecordsMerged([comm_path, node_path], include_last_callback=True)
msg = 'include_last_callback argument is ignored because last node receive messages '
msg += 'by `take` method instead of subscription callback.'
assert caplog.messages[0] == msg
merged = RecordsMerged([comm_path, node_path], include_last_callback=True)
records = merged.data

expected = RecordsCppImpl(
[
RecordCppImpl({
f'{topic0}/rclcpp_publish_timestamp/0': 1,
f'{topic0}/rcl_publish_timestamp/0': 2,
f'{topic0}/dds_write_timestamp/0': 3,
}),
],
[
ColumnValue(f'{topic0}/rclcpp_publish_timestamp/0'),
ColumnValue(f'{topic0}/rcl_publish_timestamp/0'),
ColumnValue(f'{topic0}/dds_write_timestamp/0'),
]
)

assert records.equals(expected)

caplog.set_level(WARNING)
expect = 'Since the path cannot be extended, '
expect += 'the merge process for the last callback record is skipped.'
assert expect in caplog.text

0 comments on commit c4cd270

Please sign in to comment.