Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: New Metrics endpoint for CRN & CNN #528

Merged
merged 7 commits into from
Dec 11, 2023
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
"""ccn_metric_view

Revision ID: 08602db6c78f
Revises: 3bf484f2cc95
Create Date: 2023-11-27 17:03:28.184344

"""
from alembic import op
import sqlalchemy as sa


# revision identifiers, used by Alembic.
revision = "08602db6c78f"
down_revision = "3bf484f2cc95"
branch_labels = None
depends_on = None


def upgrade() -> None:
op.execute(
"""
create or replace view ccn_metric_view AS
WITH json_data AS (
SELECT
item_hash,
jsonb_array_elements(content->'content'->'metrics'->'ccn') as ccn_data
FROM messages
WHERE channel = 'aleph-scoring' AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4'
)
SELECT
item_hash,
(ccn_data->>'measured_at')::float as measured_at,
ccn_data->>'node_id' as node_id,
(ccn_data->>'base_latency')::float as base_latency,
(ccn_data->>'metrics_latency')::float as metrics_latency,
(ccn_data->>'aggregate_latency')::float as aggregate_latency,
(ccn_data->>'base_latency_ipv4')::float as base_latency_ipv4,
(ccn_data->>'file_download_latency')::float as file_download_latency,
(ccn_data->>'pending_messages')::int as pending_messages,
(ccn_data->>'eth_height_remaining')::int as eth_height_remaining
FROM json_data;
"""
)

op.execute(
"""
create or replace view crn_metric_view AS
WITH json_data AS (
SELECT
item_hash,
jsonb_array_elements(content->'content'->'metrics'->'crn') as crn_data
FROM messages
WHERE channel = 'aleph-scoring' AND sender = '0x4D52380D3191274a04846c89c069E6C3F2Ed94e4'
)
SELECT
item_hash as item_hash,
(crn_data->>'measured_at')::float as measured_at,
crn_data->>'node_id' as node_id,
(crn_data->>'base_latency')::float as base_latency,
(crn_data->>'base_latency_ipv4')::float as base_latency_ipv4,
(crn_data->>'full_check_latency')::float as full_check_latency,
(crn_data->>'diagnostic_vm_latency')::float as diagnostic_vm_latency
FROM json_data;
"""
)
# ### end Alembic commands ###


def downgrade() -> None:
# ### commands auto generated by Alembic - please adjust! ###
op.execute("DROP VIEW ccn_metric_view;")
op.execute("DROP VIEW crn_metric_view;")

# ### end Alembic commands ###
124 changes: 124 additions & 0 deletions src/aleph/db/accessors/metrics.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
from typing import Optional

from sqlalchemy import select, text
from sqlalchemy.orm.session import Session

from aleph.types.db_session import DbSession


def _parse_ccn_result(result):
keys = [
"item_hash",
"measured_at",
"base_latency",
"base_latency_ipv4",
"metrics_latency",
"aggregate_latency",
"file_download_latency",
"pending_messages",
"eth_height_remaining",
]

# Transpose the result and create a dictionary
result_dict = {key: list(values) for key, values in zip(keys, zip(*result))}

return result_dict


def _parse_crn_result(result):
keys = [
"item_hash",
"measured_at",
"base_latency",
"base_latency_ipv4",
"full_check_latency",
"diagnostic_vm_latency",
]

# Transpose the result and create a dictionary
result_dict = {key: list(values) for key, values in zip(keys, zip(*result))}

return result_dict


def _build_metric_filter(select_stmt, node_id, start_date, end_date, sort_order):
if node_id:
select_stmt = select_stmt.where(text("node_id = :node_id")).params(
node_id=node_id
)
if start_date:
select_stmt = select_stmt.where(text("measured_at >= :start_date")).params(
start_date=start_date
)
if end_date:
select_stmt = select_stmt.where(text("measured_at <= :end_date")).params(
end_date=end_date
)
if sort_order:
select_stmt = select_stmt.order_by(text(f"measured_at {sort_order}"))
return select_stmt


def query_metric_ccn(
session: Session,
node_id: Optional[str] = None,
start_date: Optional[float] = None,
end_date: Optional[float] = None,
sort_order: Optional[str] = None,
):
select_stmt = select(
[
text("item_hash"),
text("measured_at"),
text("base_latency"),
text("base_latency_ipv4"),
text("metrics_latency"),
text("aggregate_latency"),
text("file_download_latency"),
text("pending_messages"),
text("eth_height_remaining"),
]
).select_from(text("ccn_metric_view"))

select_stmt = _build_metric_filter(
select_stmt=select_stmt,
node_id=node_id,
start_date=start_date,
end_date=end_date,
sort_order=sort_order,
)

result = session.execute(select_stmt).fetchall()

return _parse_ccn_result(result=result)


def query_metric_crn(
session: DbSession,
node_id: str,
start_date: Optional[float] = None,
end_date: Optional[float] = None,
sort_order: Optional[str] = None,
):
select_stmt = select(
[
text("item_hash"),
text("measured_at"),
text("base_latency"),
text("base_latency_ipv4"),
text("full_check_latency"),
text("diagnostic_vm_latency"),
]
).select_from(text("crn_metric_view"))

select_stmt = _build_metric_filter(
select_stmt=select_stmt,
node_id=node_id,
start_date=start_date,
end_date=end_date,
sort_order=sort_order,
)

result = session.execute(select_stmt).fetchall()

return _parse_crn_result(result=result)
75 changes: 73 additions & 2 deletions src/aleph/web/controllers/main.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
import asyncio
import logging
from dataclasses import asdict
from typing import Dict
from typing import Dict, Optional

import aiohttp_jinja2
from aiohttp import web
from pydantic import BaseModel

from aleph.db.accessors.metrics import query_metric_ccn, query_metric_crn
from aleph.types.db_session import DbSessionFactory
from aleph.web.controllers.app_state_getters import get_node_cache_from_request, get_session_factory_from_request
from aleph.web.controllers.app_state_getters import (
get_node_cache_from_request,
get_session_factory_from_request,
)
from aleph.web.controllers.metrics import format_dataclass_for_prometheus, get_metrics

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -74,3 +79,69 @@ async def metrics_json(request: web.Request) -> web.Response:
text=(await get_metrics(session=session, node_cache=node_cache)).to_json(),
content_type="application/json",
)


class Metrics(BaseModel):
start_date: Optional[float] = None
end_date: Optional[float] = None
sort: Optional[str] = None


def _get_node_id_from_request(request: web.Request) -> str:
address = request.match_info.get("node_id")
if address is None:
raise web.HTTPUnprocessableEntity(body="node_id must be specified.")
return address


async def ccn_metric(request: web.Request) -> web.Response:
"""Fetch metrics for CNN node id"""
MHHukiewitz marked this conversation as resolved.
Show resolved Hide resolved

session_factory: DbSessionFactory = get_session_factory_from_request(request)
query_params = Metrics.parse_obj(request.query)

node_id = _get_node_id_from_request(request)

with session_factory() as session:
ccn = query_metric_ccn(
session,
node_id=node_id,
start_date=query_params.start_date,
end_date=query_params.end_date,
sort_order=query_params.sort,
)
if not ccn:
raise web.HTTPNotFound()

if not ccn["item_hash"]:
raise web.HTTPNotFound()

result = {"metrics": ccn}
return web.json_response(result)


async def crn_metric(request: web.Request) -> web.Response:
"""Fetch Metric for crn."""

session_factory: DbSessionFactory = get_session_factory_from_request(request)
query_params = Metrics.parse_obj(request.query)

node_id = _get_node_id_from_request(request)

with session_factory() as session:
crn = query_metric_crn(
session,
node_id=node_id,
start_date=query_params.start_date,
end_date=query_params.end_date,
sort_order=query_params.sort,
)

if not crn:
raise web.HTTPNotFound()

if not crn["item_hash"]:
raise web.HTTPNotFound()

result = {"metrics": crn}
return web.json_response(result)
7 changes: 4 additions & 3 deletions src/aleph/web/controllers/routes.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ def register_routes(app: web.Application):
app.router.add_get("/metrics", main.metrics)
app.router.add_get("/metrics.json", main.metrics_json)

app.router.add_get("/api/v0/core/{node_id}/metrics", main.ccn_metric)
app.router.add_get("/api/v0/compute/{node_id}/metrics", main.crn_metric)

app.router.add_get(
"/api/v0/aggregates/{address}.json", aggregates.address_aggregate
)
Expand Down Expand Up @@ -58,9 +61,7 @@ def register_routes(app: web.Application):
app.router.add_get(
"/api/v0/addresses/{address}/balance", accounts.get_account_balance
)
app.router.add_get(
"/api/v0/addresses/{address}/files", accounts.get_account_files
)
app.router.add_get("/api/v0/addresses/{address}/files", accounts.get_account_files)

app.router.add_post("/api/v0/ipfs/add_json", storage.add_ipfs_json_controller)
app.router.add_post("/api/v0/storage/add_json", storage.add_storage_json_controller)
Expand Down
Loading
Loading