-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathjwta.py
191 lines (160 loc) · 6.44 KB
/
jwta.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
"""
This module provides an authentication class, Authenticator, for handling JWT-based
authentication in Streamlit applications using API calls.
"""
from datetime import timedelta, datetime
from typing import Optional
import extra_streamlit_components as stx
import requests
import streamlit as st
from utils import setup_session_keys
class Authenticator:
"""
A class for handling authentication in Streamlit applications using JWT API's.
Parameters:
- url (str): The authentication endpoint URL.
- method (str, optional): The HTTP method for authentication requests.
Defaults to "post".
- headers (dict, optional): Additional headers to include in authentication requests.
- response_handler (callable, optional): A function to process the authentication response.
- token_key (str, optional): The key to identify the authentication token in the response.
Defaults to "access".
- cookie_lifetime (timedelta, optional): The lifetime of the authentication cookie.
Defaults to timedelta(minutes=15).
"""
def __init__(self,
url: str,
method: str = "post",
headers: dict = None,
response_handler=None,
token_key: str = "access",
cookie_lifetime: timedelta = timedelta(minutes=15)
):
"""
Initializes the Authenticator instance with the specified parameters.
"""
self.url = url
self.method = method
self.headers = headers
self.response_handler = response_handler
self.token_key = token_key
self.cookie_lifetime = cookie_lifetime
self.cookie_manager = stx.CookieManager()
self.cookie_keys = []
setup_session_keys()
def _set_error(self, message):
"""
Internal method to set an authentication error message.
Parameters:
- message (str): The error message to log.
"""
def _check_cookie(self):
"""
Internal method to check the authentication status based on the stored cookie.
Returns:
bool: True if the user is authenticated, False otherwise.
"""
authentication_status = bool(self.cookie_manager.get(self.token_key))
st.session_state['authentication_status'] = authentication_status
return authentication_status
def _cache_response(self, response):
"""
Internal method to cache the authentication response in cookies.
Parameters:
- response (dict): The authentication response to cache.
"""
for k, v in response.items():
self.cookie_manager.set(k, v, key=k, expires_at=datetime.now() + self.cookie_lifetime)
if k not in self.cookie_keys:
self.cookie_keys.append(k)
def _check_credentials(self, email, password):
"""
Internal method to authenticate the user with provided credentials.
Parameters:
- email (str): The email for authentication.
- password (str): The password for authentication.
Returns:
bool: True if authentication is successful, False otherwise.
"""
if not email or not password:
self._set_error("Please provide email and password")
response = requests.request(
method=self.method,
url=self.url,
headers=self.headers,
data={"email": email, "password": password},
timeout=5
)
if not response.ok:
self._set_error(f"Response: {response.status_code}. Detail {response.text}")
return False
if self.response_handler:
response = self.response_handler(response)
else:
response = response.json()
self._cache_response(response)
st.session_state['authentication_status'] = True
return True
def _implement_logout(self):
"""
Internal method to implement logout functionality by clearing cookies and session state.
"""
self.cookie_manager.delete(self.token_key)
for k in self.cookie_keys:
self.cookie_manager.delete(k)
st.session_state['email'] = None
st.session_state['authentication_status'] = None
def login(self, location: str = 'main'):
"""
Method to display a login form and handle authentication.
Parameters:
- location (str, optional): Location to display the login form, either 'main' or 'sidebar'.
Defaults to 'main'.
Usage:
```
authenticator = Authenticator(...)
authenticator.login()
```
"""
if not st.session_state['authentication_status']:
self._check_cookie()
if not st.session_state['authentication_status']:
if location == 'main':
login_form = st.form('JWTLogin')
elif location == 'sidebar':
login_form = st.sidebar.form('JWTLogin')
else:
self._set_error('Invalid location! Available locations: ["main", "sidebar"].')
email = login_form.text_input("email")
password = login_form.text_input('Password', type='password')
st.session_state['email'] = email
if login_form.form_submit_button("Login"):
self._check_credentials(email, password)
def logout(
self,
location: str = 'main',
button_name: str = 'Logout',
key: Optional[str] = None
):
"""
Method to handle user logout form.
Parameters:
- location (str, optional): Location to display the logout button,
either 'main' or 'sidebar'. Defaults to 'main'.
- button_name (str, optional): The label for the logout button.
Defaults to 'Logout'.
- key (str, optional): A key to associate with the logout button for Streamlit.
Defaults to None.
Usage:
```
authenticator = Authenticator(...)
authenticator.login()
if st.session_state["authentication_status"]:
authenticator.logout()
"""
if location == 'main':
if st.button(button_name, key):
self._implement_logout()
elif location == 'sidebar':
if st.sidebar.button(button_name, key):
self._implement_logout()