Skip to content

Commit

Permalink
refactor: use different profile types for platform/cluster features
Browse files Browse the repository at this point in the history
  • Loading branch information
fubuloubu committed Jul 10, 2024
1 parent a18aeb5 commit c48e9df
Show file tree
Hide file tree
Showing 6 changed files with 314 additions and 248 deletions.
173 changes: 100 additions & 73 deletions silverback/_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,22 @@
verbosity_option,
)
from ape.exceptions import Abort
from fief_client.integrations.cli import FiefAuthNotAuthenticatedError
from fief_client import Fief
from fief_client.integrations.cli import FiefAuth, FiefAuthNotAuthenticatedError
from taskiq import AsyncBroker
from taskiq.cli.worker.run import shutdown_broker
from taskiq.receiver import Receiver

from silverback._importer import import_from_string
from silverback.platform.client import DEFAULT_PROFILE, PlatformClient
from silverback.platform.types import ClusterConfiguration, ClusterTier
from silverback.cluster.client import Client, ClusterClient, PlatformClient
from silverback.cluster.settings import (
DEFAULT_PROFILE,
PROFILE_PATH,
ClusterProfile,
PlatformProfile,
ProfileSettings,
)
from silverback.cluster.types import ClusterConfiguration, ClusterTier
from silverback.runner import PollingRunner, WebsocketRunner


Expand Down Expand Up @@ -147,34 +155,64 @@ def worker(cli_ctx, account, workers, max_exceptions, shutdown_timeout, path):
asyncio.run(run_worker(app.broker, worker_count=workers, shutdown_timeout=shutdown_timeout))


def platform_client(display_userinfo: bool = True):
def get_client(ctx, param, value) -> PlatformClient:
client = PlatformClient(profile_name=value)
def get_auth(profile_name: str = DEFAULT_PROFILE) -> FiefAuth:
settings = ProfileSettings.from_config_file()
auth_info = settings.auth[profile_name]
fief = Fief(auth_info.host, auth_info.client_id)
return FiefAuth(fief, str(PROFILE_PATH.parent / f"{profile_name}.json"))

# NOTE: We need to be authenticated to display userinfo
if not display_userinfo:
return client

try:
userinfo = client.userinfo # cache this
except FiefAuthNotAuthenticatedError as e:
raise click.UsageError("Not authenticated, please use `silverback login` first.") from e
def display_login_message(auth: FiefAuth, host: str):
userinfo = auth.current_user()

user_id = userinfo["sub"]
username = userinfo["fields"].get("username")
click.echo(
f"{click.style('INFO', fg='blue')}: "
f"Logged in as '{click.style(username if username else user_id, bold=True)}'"
)
return client
user_id = userinfo["sub"]
username = userinfo["fields"].get("username")
click.echo(
f"{click.style('INFO', fg='blue')}: "
f"Logged in to '{click.style(host, bold=True)}' as "
f"'{click.style(username if username else user_id, bold=True)}'"
)


def client_option():
settings = ProfileSettings.from_config_file()

def get_client_from_profile(ctx, param, value) -> Client:
if not (profile := settings.profile.get(value)):
raise click.BadOptionUsage(option_name=param, message=f"Unknown profile '{value}'.")

if isinstance(profile, PlatformProfile):
auth = get_auth(profile.auth)

try:
display_login_message(auth, profile.host)
except FiefAuthNotAuthenticatedError as e:
raise click.UsageError(
"Not authenticated, please use `silverback login` first."
) from e

return PlatformClient(
base_url=profile.host,
cookies=dict(session=auth.access_token_info()["access_token"]),
)

elif isinstance(profile, ClusterProfile):
click.echo(
f"{click.style('INFO', fg='blue')}: Logged in to "
f"'{click.style(profile.host, bold=True)}' using API Key"
)
return ClusterClient(
base_url=profile.host,
headers={"X-API-Key": profile.api_key},
)

return click.option(
"-p",
"--profile",
"platform_client",
"client",
default=DEFAULT_PROFILE,
callback=get_client,
help="Profile to use for Authentication and Platform API Host.",
callback=get_client_from_profile,
help="Profile to use for connecting to Cluster Host.",
)


Expand All @@ -183,46 +221,42 @@ class PlatformCommands(click.Group):
def list_commands(self, ctx: click.Context) -> list[str]:
return list(self.commands)

def command(self, *args, display_userinfo=True, **kwargs):
profile_option = platform_client(display_userinfo=display_userinfo)
outer = super().command

def decorator(fn):
return outer(*args, **kwargs)(profile_option(fn))

return decorator


@cli.group(cls=PlatformCommands)
def cluster():
"""Connect to hosted application clusters"""


@cluster.command(display_userinfo=False) # Otherwise would fail because not authorized
def login(platform_client: PlatformClient):
@cluster.command()
@click.option(
"-a",
"--auth-profile",
"auth",
default=DEFAULT_PROFILE,
callback=lambda ctx, param, value: get_auth(value),
help="Authentication profile to use for Platform login.",
)
def login(auth: FiefAuth):
"""Login to hosted clusters"""
platform_client.auth.authorize()
userinfo = platform_client.userinfo # cache this
user_id = userinfo["sub"]
username = userinfo["fields"]["username"]
click.echo(
f"{click.style('SUCCESS', fg='green')}: Logged in as "
f"'{click.style(username, bold=True)}' (UUID: {user_id})"
)

auth.authorize()
display_login_message(auth, auth.client.base_url)


@cluster.command()
def workspaces(platform_client: PlatformClient):
@client_option()
def workspaces(client: Client):
"""List available workspaces"""

user_id = platform_client.userinfo["sub"]
if workspaces := platform_client.workspaces:
if not isinstance(client, PlatformClient):
raise click.UsageError("This feature is not available when directly connected to a cluster")

if workspaces := client.workspaces:
for workspace_slug, workspace_info in workspaces.items():
click.echo(f"{workspace_slug}:")
click.echo(f" id: {workspace_info.id}")
click.echo(f" name: {workspace_info.name}")
is_owner = str(workspace_info.owner_id) == user_id
click.echo(f" owner: {is_owner}")
click.echo(f" owner: {workspace_info.owner_id}")

else:
click.secho(
Expand All @@ -234,10 +268,15 @@ def workspaces(platform_client: PlatformClient):


@cluster.command(name="list")
@client_option()
@click.option("-w", "--workspace", "workspace_name")
def list_clusters(platform_client: PlatformClient, workspace_name: str):
def list_clusters(client: Client, workspace_name: str):
"""List available clusters"""
if not (workspace := platform_client.workspaces.get(workspace_name)):

if not isinstance(client, PlatformClient):
raise click.UsageError("This feature is not available when directly connected to a cluster")

if not (workspace := client.workspaces.get(workspace_name)):
raise click.BadOptionUsage("workspace_name", f"Unknown workspace '{workspace_name}'")

if clusters := workspace.clusters:
Expand All @@ -262,6 +301,7 @@ def list_clusters(platform_client: PlatformClient, workspace_name: str):


@cluster.command(name="new")
@client_option()
@click.option("-w", "--workspace", "workspace_name")
@click.option(
"-n",
Expand All @@ -277,14 +317,18 @@ def list_clusters(platform_client: PlatformClient, workspace_name: str):
)
@click.option("-c", "--config", "config_updates", type=(str, str), multiple=True)
def new_cluster(
platform_client: PlatformClient,
client: Client,
workspace_name: str,
cluster_name: str,
tier: str,
config_updates: list[tuple[str, str]],
):
"""Create a new cluster"""
if not (workspace := platform_client.workspaces.get(workspace_name)):

if not isinstance(client, PlatformClient):
raise click.UsageError("This feature is not available when directly connected to a cluster")

if not (workspace := client.workspaces.get(workspace_name)):
raise click.BadOptionUsage("workspace_name", f"Unknown workspace '{workspace_name}'")

base_configuration = getattr(ClusterTier, tier.upper()).configuration()
Expand All @@ -302,37 +346,20 @@ def new_cluster(


@cluster.command()
@client_option()
@click.option("-w", "--workspace", "workspace_name")
@click.option(
"-c",
"--cluster",
"cluster_name",
help="Name of cluster to connect with.",
required=True,
)
def bots(platform_client: PlatformClient, workspace_name: str, cluster_name: str):
def bots(client: Client, workspace_name: str, cluster_name: str):
"""List all bots in a cluster"""
if not (workspace := platform_client.workspaces.get(workspace_name)):
raise click.BadOptionUsage("workspace_name", f"Unknown workspace '{workspace_name}'")

if not (cluster := workspace.clusters.get(cluster_name)):
if clusters := "', '".join(workspace.clusters):
message = f"'{cluster_name}' is not a valid cluster, must be one of: '{clusters}'."

else:
suggestion = (
"Check out https://silverback.apeworx.io "
"for more information on how to get started"
)
message = "You have no valid clusters to chose from\n\n" + click.style(
suggestion, bold=True
)
raise click.BadOptionUsage(
option_name="cluster_name",
message=message,
)
if not isinstance(client, ClusterClient):
client = client.get_cluster_client(workspace_name, cluster_name)

if bots := cluster.bots:
if bots := client.bots:
click.echo("Available Bots:")
for bot_name, bot_info in bots.items():
click.echo(f"- {bot_name} (UUID: {bot_info.id})")
Expand Down
File renamed without changes.
Loading

0 comments on commit c48e9df

Please sign in to comment.