diff --git a/core/services/install-services.sh b/core/services/install-services.sh index 9329b62280..d330827f2d 100755 --- a/core/services/install-services.sh +++ b/core/services/install-services.sh @@ -35,7 +35,6 @@ SERVICES=( ping versionchooser wifi - major_tom ) # We need to install loguru, appdirs and pydantic since they may be used inside setup.py diff --git a/core/services/major_tom/main.py b/core/services/major_tom/main.py deleted file mode 100755 index 6063acf1c4..0000000000 --- a/core/services/major_tom/main.py +++ /dev/null @@ -1,85 +0,0 @@ -#! /usr/bin/env python3 -import copy -import datetime -import logging -import sys -import time -import uuid -from typing import Any, Dict -from zoneinfo import ZoneInfo - -import loguru -from commonwealth.utils.general import ( - local_hardware_identifier, - local_unique_identifier, -) -from commonwealth.utils.logs import init_logger - -from src.core import TelemetryEngine, get_latency -from src.metrics import Metrics -from src.typedefs import AnonymousTelemetryRecord, DefaultPayload - -LOG_SESSION_UUID = str(uuid.uuid4()) - -SERVICE_NAME = "major_tom" -LOG_FOLDER_PATH = f"/var/logs/blueos/services/{SERVICE_NAME}/buffer" - -TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/" -S3_TELEMETRY_ENDPOINT = "https://telemetry.blueos.cloud/api/v1/anonymous/s3/" - - -def compose_default_record(order: int) -> Dict[str, Any]: - date_time_utc = datetime.datetime.now(ZoneInfo("UTC")).isoformat() - payload = DefaultPayload( - log_session_uuid=LOG_SESSION_UUID, - order=order, - timestamp=date_time_utc, - hardware_id=local_hardware_identifier(), - blueos_id=local_unique_identifier(), - data=None, - ) - - start_probing = time.time() - metrics = Metrics() - record = AnonymousTelemetryRecord( - time.clock_gettime(time.CLOCK_BOOTTIME), - get_latency(), - metrics.memory.total, - metrics.memory.used, - metrics.disk.total, - metrics.disk.used, - metrics.installed_extensions, - metrics.installed_version, - 0, - ) - record.probe_time = time.time() - start_probing - payload.data = record - return payload.json() - - -if __name__ == "__main__": - - # this is required to have two loggers in the same process - # see https://loguru.readthedocs.io/en/latest/resources/recipes.html#creating-independent-loggers-with-separate-set-of-handlers - loguru.logger.remove() - log_buffer = copy.deepcopy(loguru.logger) - - logging.basicConfig(level=logging.INFO) - loguru.logger.add( - sys.stderr, - format="{time:YYYY-MM-DD HH:mm:ss} | {level: <8} | {name}:{function}:{line} - {message}", - level="INFO", - ) - init_logger(SERVICE_NAME) - loguru.logger.info(f"Starting Major Tom, session UUID: {LOG_SESSION_UUID}") - TelemetryEngine( - label="anonymous", # used to tag telemetry type. we may have non-anonymous telemetry in the future - endpoint=TELEMETRY_ENDPOINT, - s3_endpoint=S3_TELEMETRY_ENDPOINT, - create_record=compose_default_record, - interval=60 * 5, # 5 minutes - max_file_size=1024 * 1024, # 1Mb - max_file_retention=10, - buffer_folder=LOG_FOLDER_PATH, - log_buffer=log_buffer, - )() diff --git a/core/services/major_tom/setup.py b/core/services/major_tom/setup.py deleted file mode 100644 index ac1ee0478c..0000000000 --- a/core/services/major_tom/setup.py +++ /dev/null @@ -1,14 +0,0 @@ -#!/usr/bin/env python3 - -import setuptools - -setuptools.setup( - name="Major Tom", - version="0.1.0", - description="Sends telemetry back to Ground Control", - license="MIT", - install_requires=[ - "requests==2.31.0", - "loguru==0.5.3", - ], -) diff --git a/core/services/major_tom/src/__init__.py b/core/services/major_tom/src/__init__.py deleted file mode 100644 index e69de29bb2..0000000000 diff --git a/core/services/major_tom/src/core.py b/core/services/major_tom/src/core.py deleted file mode 100644 index 2dfa2d3d80..0000000000 --- a/core/services/major_tom/src/core.py +++ /dev/null @@ -1,137 +0,0 @@ -import gzip -import http -import json -import os -import shutil -import time -from typing import Any, Callable, Dict, List - -import loguru -import requests -import speedtest -from loguru import logger - -from src.typedefs import OnlineStatus - - -def formatter(record: "loguru.Record") -> str: - # Note this function returns the string to be formatted, not the actual message to be logged - record["extra"]["serialized"] = json.dumps(record["message"]) - return "{extra[serialized]}\n" - - -def is_online() -> bool: - return get_latency() > 0 - - -def get_latency() -> float: - try: - servers: List[str] = [] - st = speedtest.Speedtest() - st.get_servers(servers) - best_server = st.get_best_server() - ping = best_server["latency"] - return float(ping) - except Exception: - return -1.0 - - -class TelemetryEngine: - # pylint: disable=too-many-arguments - def __init__( - self, - label: str, - endpoint: str, - s3_endpoint: str, - create_record: Callable[[Any], Any], - interval: float, - max_file_size: int, - max_file_retention: int, - buffer_folder: str, - log_buffer: loguru._logger.Logger, # type: ignore - ): - self.buffer_file = f"{buffer_folder}/{label}_usage.log" - self.buffer_folder = buffer_folder - - self.telemetry_endpoint = endpoint - self.telemetry_s3_endpoint = s3_endpoint - self.create_record = create_record - self.interval = interval - - self.log_buffer = log_buffer - self.log_buffer.add( - self.buffer_file, - rotation=max_file_size, - retention=max_file_retention, - format=formatter, - compression="gz", - ) - - def __call__(self) -> None: - order = 0 - while True: - order += 1 - record = self.create_record(order) - if self.save(record) == "online": - self.process_buffered_records() - time.sleep(self.interval) - - def upload_file(self, file: str) -> bool: - """ - This method requests to telemetry API a presigned url and upload the local archived files. - """ - logger.info(f"uploading file... {file}") - try: - response = requests.get(self.telemetry_s3_endpoint, timeout=5).json() - with open(file, "rb") as fh: - files = {"file": (file, fh)} - r = requests.post(response["url"], data=response["fields"], files=files, timeout=300) - if r.status_code == http.client.NO_CONTENT: - logger.info("[Success!]") - return True - except Exception as error: - logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.") - logger.error(f"error upload log file: {error}") - - return False - - def process_buffered_records(self) -> None: - """ - Check in the buffered folder if there are archived logs to upload. If the agent connects before an archive - is created it will also archive the current buffer file and upload it. - """ - for file in os.listdir(self.buffer_folder): - file_path = os.path.join(self.buffer_folder, file) - - # Upload regular archive - if file_path.endswith(".log.gz") and self.upload_file(file_path): - os.remove(file_path) - continue - - # Archive current buffer and upload it - if file_path == self.buffer_file and os.path.getsize(file_path): - timestamp = int(time.time()) - tmp_name = file_path.replace(".log", f".{timestamp}.log.gz") - with open(file_path, "rb") as f_in, gzip.open(tmp_name, "wb") as f_out: - shutil.copyfileobj(f_in, f_out) - os.remove(file_path) - if self.upload_file(tmp_name): - os.remove(tmp_name) - with open(self.buffer_file, "w", encoding="utf-8"): - # create new empty file if not there - pass - - def save(self, record: Dict[str, Any]) -> OnlineStatus: - """ - Try to POST the telemetry payload, if it fails for any reason, we buffer it locally. - """ - try: - r = requests.post(self.telemetry_endpoint, json=record, timeout=5) - if r.status_code == http.client.CREATED: - return OnlineStatus.ONLINE - except Exception as error: - logger.info("Ground Control to Major Tom. Your circuit's dead, there's something wrong.") - logger.error(f"error posting telemetry to Ground Control: {error}") - - self.log_buffer.info(record) - return OnlineStatus.OFFLINE diff --git a/core/services/major_tom/src/metrics.py b/core/services/major_tom/src/metrics.py deleted file mode 100644 index d26d43d813..0000000000 --- a/core/services/major_tom/src/metrics.py +++ /dev/null @@ -1,48 +0,0 @@ -import http -from functools import cached_property -from typing import List, Optional - -import psutil -import requests -from loguru import logger - -from src.typedefs import ExtensionInfo, VersionInfo - - -class Metrics: - @cached_property - def installed_extensions(self) -> Optional[List[ExtensionInfo]]: - try: - req = requests.get("http://localhost/kraken/v1.0/installed_extensions", timeout=3) - if req.status_code == http.client.OK: - return [ExtensionInfo(identifier=rec["identifier"], tag=rec["tag"]) for rec in req.json()] - except Exception as error: - logger.error(f"Error getting installed extensions: {error}") - return None - return [] - - @cached_property - def disk(self) -> psutil._common.sdiskusage: - return psutil.disk_usage("/") - - @cached_property - def memory(self) -> psutil._pslinux.svmem: - return psutil.virtual_memory() - - @cached_property - def installed_version(self) -> Optional[VersionInfo]: - try: - req = requests.get("http://localhost/version-chooser/v1.0/version/current", timeout=3) - if req.status_code == http.client.OK: - data = req.json() - return VersionInfo( - repository=data["repository"], - tag=data["tag"], - last_modified=data["last_modified"], - sha=data["sha"], - architecture=data["architecture"], - ) - - except Exception as error: - logger.error(f"Error getting version info: {error}") - return None diff --git a/core/services/major_tom/src/typedefs.py b/core/services/major_tom/src/typedefs.py deleted file mode 100644 index 68531c0b6d..0000000000 --- a/core/services/major_tom/src/typedefs.py +++ /dev/null @@ -1,59 +0,0 @@ -from dataclasses import asdict, dataclass -from enum import Enum -from typing import Any, Optional - - -@dataclass -class ExtensionInfo: - identifier: str - tag: str - - -@dataclass -class VersionInfo: - repository: str - tag: str - last_modified: str - sha: str - architecture: str - - -class OnlineStatus(str, Enum): - ONLINE = "online" - OFFLINE = "offline" - UNKNOWN = "unknown" - - -@dataclass -class TelemetryRecord: - pass - - -@dataclass -# pylint: disable=too-many-instance-attributes -class AnonymousTelemetryRecord(TelemetryRecord): - uptime: float - latency: float - memory_size: int - memory_usage: int - disk_size: int - disk_usage: int - extensions: Optional[list[ExtensionInfo]] - blueos_version: Optional[VersionInfo] - probe_time: float - - def json(self) -> dict[str, Any]: - return asdict(self) - - -@dataclass -class DefaultPayload: - log_session_uuid: str - order: int - timestamp: str - hardware_id: str - blueos_id: str - data: Optional[TelemetryRecord] - - def json(self) -> dict[str, Any]: - return asdict(self) diff --git a/core/start-blueos-core b/core/start-blueos-core index 58bb40fe21..83ba86db0e 100755 --- a/core/start-blueos-core +++ b/core/start-blueos-core @@ -94,7 +94,6 @@ SERVICES=( 'nginx',"nice -18 nginx -g \"daemon off;\" -c $TOOLS_PATH/nginx/nginx.conf" 'log_zipper',"nice -20 $SERVICES_PATH/log_zipper/main.py '/shortcuts/system_logs/**/*.log' --max-age-minutes 60" 'bag_of_holding',"$SERVICES_PATH/bag_of_holding/main.py" - 'major_tom',"$SERVICES_PATH/major_tom/main.py" ) tmux -f /etc/tmux.conf start-server