-
Notifications
You must be signed in to change notification settings - Fork 0
Authentication with JWT, Part 1 #188
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from 23 commits
29a0453
7a56d46
ef1e313
ac41030
fd05781
d7dca4b
4dcc311
125b101
cef80ae
6eb83ad
050b127
b70db0e
6fb3cba
d20fbd2
27afe1e
19ccba6
21e6ae0
f631b4d
e658bae
42881cb
bb96a68
88fca2c
5b545f8
fa748ca
2ffd7fa
55fbcf1
22f8e1f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
"""Module for handling authentication and token creation.""" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,148 @@ | ||
"""Module for handling token and user creation.""" | ||
|
||
import hashlib | ||
from datetime import datetime, timedelta, timezone | ||
from typing import Any | ||
|
||
import bcrypt | ||
import jwt | ||
from motor.motor_asyncio import AsyncIOMotorDatabase | ||
from pydantic import BaseModel | ||
|
||
from florist.api.db.client_entities import UserDAO | ||
from florist.api.db.server_entities import User | ||
|
||
|
||
ENCRYPTION_ALGORITHM = "HS256" | ||
DEFAULT_USERNAME = "admin" | ||
DEFAULT_PASSWORD = "admin" | ||
TOKEN_EXPIRATION_TIMEDELTA = timedelta(days=7) | ||
|
||
|
||
class Token(BaseModel): | ||
"""Define the Token model.""" | ||
|
||
access_token: str | ||
token_type: str | ||
|
||
class Config: | ||
"""Config for the Token model.""" | ||
|
||
allow_population_by_field_name = True | ||
schema_extra = { | ||
"example": { | ||
"access_token": "LQv3c1yqBWVHxkd0LHAkCOYz6T", | ||
"token_type": "bearer", | ||
}, | ||
} | ||
|
||
|
||
class AuthUser(BaseModel): | ||
"""Define the User model to be returned by the API.""" | ||
|
||
uuid: str | ||
username: str | ||
|
||
class Config: | ||
"""Config for the AuthUser model.""" | ||
|
||
allow_population_by_field_name = True | ||
schema_extra = { | ||
"example": { | ||
"uuid": "LQv3c1yqBWVHxkd0LHAkCOYz6T", | ||
"username": "admin", | ||
}, | ||
} | ||
|
||
|
||
def _simple_hash(word: str) -> str: | ||
""" | ||
Hash a word with sha256. | ||
|
||
WARNING: This is not a secure hash function, it is only meant to obscure | ||
plain text words. DO NOT use this for generating encrypted passwords for the | ||
authentication users. For that, use the _password_hash function instead. | ||
|
||
:param word: (str) the word to hash. | ||
:return: (str) the word hashed as a sha256 hexadecimal string. | ||
""" | ||
return hashlib.sha256(word.encode("utf-8")).hexdigest() | ||
|
||
|
||
|
||
def _password_hash(password: str) -> str: | ||
""" | ||
Hash a password with bcrypt. | ||
|
||
:param password: (str) the password to hash. | ||
:return: (str) the hashed password. | ||
""" | ||
password_bytes = password.encode("utf-8") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is perhaps too pedantic, but should we first check that the password given is |
||
salt = bcrypt.gensalt() | ||
hashed_password = bcrypt.hashpw(password=password_bytes, salt=salt) | ||
return hashed_password.decode("utf-8") | ||
|
||
|
||
def verify_password(password: str, hashed_password: str) -> bool: | ||
""" | ||
Verify if a password matches a hashed password. | ||
|
||
:param password: (str) the password to verify. | ||
:param hashed_password: (str) the hashed password to verify against. | ||
:return: (bool) True if the password matches the hashed password, False otherwise. | ||
""" | ||
return bcrypt.checkpw(password.encode("utf-8"), hashed_password.encode("utf-8")) | ||
|
||
|
||
async def make_default_server_user(database: AsyncIOMotorDatabase[Any]) -> User: | ||
""" | ||
Make a default server user. | ||
|
||
:param database: (AsyncIOMotorDatabase[Any]) the database to create the user in. | ||
:return: (User) the default server user. | ||
""" | ||
hashed_password = _password_hash(_simple_hash(DEFAULT_PASSWORD)) | ||
user = User(username=DEFAULT_USERNAME, hashed_password=hashed_password) | ||
await user.create(database) | ||
return user | ||
|
||
|
||
def make_default_client_user() -> UserDAO: | ||
""" | ||
Make a default client user. | ||
|
||
:return: (User) the default client user. | ||
""" | ||
hashed_password = _password_hash(_simple_hash(DEFAULT_PASSWORD)) | ||
user = UserDAO(username=DEFAULT_USERNAME, hashed_password=hashed_password) | ||
user.save() | ||
return user | ||
|
||
|
||
def create_access_token( | ||
data: dict[str, Any], secret_key: str, expiration_delta: timedelta = TOKEN_EXPIRATION_TIMEDELTA | ||
) -> str: | ||
""" | ||
Create an access token. | ||
|
||
:param data: (dict) the data to encode in the token. | ||
:param secret_key: (str) the user's secret key to encode the token. | ||
:param expiration_delta: (timedelta) the expiration time of the token. | ||
:return: (str) the access token. | ||
""" | ||
to_encode = data.copy() | ||
expire = datetime.now(timezone.utc) + expiration_delta | ||
to_encode.update({"exp": expire}) | ||
return jwt.encode(to_encode, secret_key, algorithm=ENCRYPTION_ALGORITHM) | ||
|
||
|
||
def decode_access_token(token: str, secret_key: str) -> dict[str, Any]: | ||
""" | ||
Decode an access token. | ||
|
||
:param token: (str) the token to decode. | ||
:param secret_key: (str) the user's secret key to decode the token. | ||
:return: (dict) the decoded token information. | ||
""" | ||
data = jwt.decode(token, secret_key, algorithms=[ENCRYPTION_ALGORITHM]) | ||
assert isinstance(data, dict) | ||
return data |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,7 @@ | ||
"""Definitions for the SQLIte database entities (client database).""" | ||
|
||
import json | ||
import secrets | ||
import sqlite3 | ||
from abc import ABC, abstractmethod | ||
from typing import Optional | ||
|
@@ -55,7 +56,7 @@ def find(cls, uuid: str) -> Self: | |
for result in results: | ||
return cls.from_json(result[1]) | ||
|
||
raise ValueError(f"Client with uuid '{uuid}' not found.") | ||
raise ValueError(f"{cls.table_name} with uuid '{uuid}' not found.") | ||
|
||
@classmethod | ||
def exists(cls, uuid: str) -> bool: | ||
|
@@ -167,3 +168,55 @@ def to_json(self) -> str: | |
"pid": self.pid, | ||
} | ||
) | ||
|
||
|
||
class UserDAO(EntityDAO): | ||
"""Data Access Object (DAO) for the User SQLite entity.""" | ||
|
||
table_name = "User" | ||
|
||
def __init__(self, username: str, hashed_password: str): | ||
""" | ||
Initialize a User entity. | ||
|
||
:param uuid: (str) the UUID of the user. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I might be missing something here, but I don't think uuid is a param? |
||
:param username: (str) the username of the user. | ||
:param hashed_password: (str) the hashed password of the user. | ||
""" | ||
# The UUID for the user is the username | ||
super().__init__(uuid=username) | ||
|
||
self.username = username | ||
self.hashed_password = hashed_password | ||
|
||
# always create a new random secret key | ||
self.secret_key = secrets.token_hex(32) | ||
|
||
@classmethod | ||
def from_json(cls, json_data: str) -> Self: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Any reason you don't just type this as There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I honestly don't know which is the canonical "right" way to do this, but you can use the more "obvious" type |
||
""" | ||
Convert from a JSON string into an instance of User. | ||
|
||
:param json_data: the user's data as a JSON string. | ||
:return: (Self) and instancxe of UserDAO populated with the JSON data. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instance? |
||
""" | ||
data = json.loads(json_data) | ||
user = cls(data["username"], data["hashed_password"]) | ||
user.uuid = data["uuid"] | ||
user.secret_key = data["secret_key"] | ||
return user | ||
|
||
def to_json(self) -> str: | ||
""" | ||
Convert the user data into a JSON string. | ||
|
||
:return: (str) the user data as a JSON string. | ||
""" | ||
return json.dumps( | ||
{ | ||
"uuid": self.uuid, | ||
"username": self.username, | ||
"hashed_password": self.hashed_password, | ||
"secret_key": self.secret_key, | ||
} | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is probably going to be a dumb question, by what is the point of these inner classes? Perhaps they are expected to be defined for objects inheriting from BaseModel and are therefore used somewhere in outside library code, but I don't see where these get used in our stuff 🙂 (Some question for the inner class of AuthUser.