This repository has been archived by the owner on Mar 4, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathrequires.py
199 lines (176 loc) · 6.59 KB
/
requires.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
#!/usr/local/sbin/charm-env python3
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from typing import List
from charms.reactive import (
Endpoint,
toggle_flag,
)
from charmhelpers.core.hookenv import log
try:
from .models import Taint, Label
except ImportError:
# when this code is under test...it's not installed in a package
# so catching this exception is simply for the test framework
from models import Taint, Label
class KubeControlRequirer(Endpoint):
"""
Implements the kubernetes-worker side of the kube-control interface.
"""
def manage_flags(self):
"""
Set states corresponding to the data we have.
"""
toggle_flag(self.expand_name("{endpoint_name}.connected"), self.is_joined)
toggle_flag(
self.expand_name("{endpoint_name}.dns.available"),
self.is_joined and self.dns_ready(),
)
toggle_flag(
self.expand_name("{endpoint_name}.auth.available"),
self.is_joined and self._has_auth_credentials(),
)
toggle_flag(
self.expand_name("{endpoint_name}.cluster_tag.available"),
self.is_joined and self.get_cluster_tag(),
)
toggle_flag(
self.expand_name("{endpoint_name}.registry_location.available"),
self.is_joined and self.get_registry_location(),
)
toggle_flag(
self.expand_name("{endpoint_name}.controller_taints.available"),
self.is_joined and self.get_controller_taints(),
)
toggle_flag(
self.expand_name("{endpoint_name}.controller_labels.available"),
self.is_joined and self.get_controller_labels(),
)
toggle_flag(
self.expand_name("{endpoint_name}.cohort_keys.available"),
self.is_joined and self.cohort_keys,
)
toggle_flag(
self.expand_name("{endpoint_name}.default_cni.available"),
self.is_joined and self.get_default_cni() is not None,
)
toggle_flag(
self.expand_name("{endpoint_name}.api_endpoints.available"),
self.is_joined and self.get_api_endpoints(),
)
def get_auth_credentials(self, user):
"""
Return the authentication credentials.
"""
rx = {}
for unit in self.all_joined_units:
rx.update(unit.received.get("creds", {}))
if not rx:
return None
if user in rx:
return {
"user": user,
"kubelet_token": rx[user]["kubelet_token"],
"proxy_token": rx[user]["proxy_token"],
"client_token": rx[user]["client_token"],
}
else:
return None
def get_dns(self):
"""
Return DNS info provided by the control-plane.
"""
rx = self.all_joined_units.received_raw
return {
"port": rx.get("port"),
"domain": rx.get("domain"),
"sdn-ip": rx.get("sdn-ip"),
"enable-kube-dns": rx.get("enable-kube-dns"),
}
def dns_ready(self):
"""
Return True if we have all DNS info from the control-plane.
"""
keys = ["port", "domain", "sdn-ip", "enable-kube-dns"]
dns_info = self.get_dns()
return (
set(dns_info.keys()) == set(keys)
and dns_info["enable-kube-dns"] is not None
)
def set_auth_request(self, kubelet, group="system:nodes"):
"""
Tell the control-plane that we are requesting auth, and to use this
hostname for the kubelet system account.
Param groups - Determines the level of eleveted privleges of the
requested user. Can be overridden to request sudo level access on the
cluster via changing to system:masters.
"""
for relation in self.relations:
relation.to_publish_raw.update(
{"kubelet_user": kubelet, "auth_group": group}
)
def set_gpu(self, enabled=True):
"""
Tell the control-plane that we're gpu-enabled (or not).
"""
log("Setting gpu={} on kube-control relation".format(enabled))
for relation in self.relations:
relation.to_publish_raw.update({"gpu": enabled})
def _has_auth_credentials(self):
"""
Predicate method to signal we have authentication credentials.
"""
if self.all_joined_units.received_raw.get("creds"):
return True
def get_cluster_tag(self):
"""
Tag for identifying resources that are part of the cluster.
"""
return self.all_joined_units.received_raw.get("cluster-tag")
def get_registry_location(self):
"""
URL for container image registry.
"""
return self.all_joined_units.received_raw.get("registry-location")
@property
def cohort_keys(self):
"""
The cohort snapshot keys sent by the control-planes.
"""
return self.all_joined_units.received["cohort-keys"]
def get_default_cni(self):
"""
Default CNI network to use.
"""
return self.all_joined_units.received["default-cni"]
def get_api_endpoints(self):
"""
Returns a list of API endpoint URLs.
"""
endpoints = set()
for unit in self.all_joined_units:
endpoints.update(unit.received["api-endpoints"] or [])
return sorted(endpoints)
@property
def has_xcp(self):
"""
The flag indicating whether an external cloud provider is in use.
"""
return self.all_joined_units.received.get("has-xcp", False)
def get_controller_taints(self) -> List[Taint]:
"""Returns a list of taints configured on the control-plane nodes."""
taints = self.all_joined_units.received.get("taints", [])
return [Taint.decode(_) for _ in taints]
def get_controller_labels(self) -> List[Label]:
"""Returns a list of lables configured on the control-plane nodes."""
labels = self.all_joined_units.received.get("labels", [])
return [Label.decode(_) for _ in labels]