From 4ede234498e04277956f84358bedc13d84fed9c1 Mon Sep 17 00:00:00 2001 From: Impre-visible Date: Sun, 17 Nov 2024 15:55:04 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=94=A5files:=20Removes=20unused=20route?= =?UTF-8?q?=20files?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- src/chocolate_app/routes/arr.py | 233 ----------------- src/chocolate_app/routes/libraries.py | 349 -------------------------- src/chocolate_app/routes/settings.py | 145 ----------- src/chocolate_app/routes/users.py | 296 ---------------------- 4 files changed, 1023 deletions(-) delete mode 100644 src/chocolate_app/routes/arr.py delete mode 100644 src/chocolate_app/routes/libraries.py delete mode 100644 src/chocolate_app/routes/settings.py delete mode 100644 src/chocolate_app/routes/users.py diff --git a/src/chocolate_app/routes/arr.py b/src/chocolate_app/routes/arr.py deleted file mode 100644 index c871ef6..0000000 --- a/src/chocolate_app/routes/arr.py +++ /dev/null @@ -1,233 +0,0 @@ - -from tmdbv3api import Find # type: ignore -from flask import Blueprint, jsonify, request -from pyarr import LidarrAPI, RadarrAPI, ReadarrAPI, SonarrAPI - -from chocolate_app import config - -arr_bp = Blueprint("arr", __name__) - - -@arr_bp.route("/lookup", methods=["POST"]) -def lookup(): - json_file = request.get_json() - media_type = json_file["mediaType"] - query = json_file["query"] - - if media_type == "movie": - radarr_api_key = config["APIKeys"]["radarr"] - radarr_url = config["ARRSettings"]["radarrurl"] - radarr = RadarrAPI(radarr_url, radarr_api_key) - search_results = radarr.lookup_movie(query) - return jsonify(search_results) - elif media_type == "serie": - sonarr_api_key = config["APIKeys"]["sonarr"] - sonarr_url = config["ARRSettings"]["sonarrurl"] - sonarr = SonarrAPI(sonarr_url, sonarr_api_key) - search_results = sonarr.lookup_series(query) - return jsonify(search_results) - elif media_type == "music": - lidarr_api_key = config["APIKeys"]["lidarr"] - lidarr_url = config["ARRSettings"]["lidarrurl"] - lidarr = LidarrAPI(lidarr_url, lidarr_api_key) - search_results = lidarr.lookup(query) - return jsonify(search_results) - elif media_type == "book": - readarr_api_key = config["APIKeys"]["readarr"] - readarr_url = config["ARRSettings"]["readarrurl"] - readarr = ReadarrAPI(readarr_url, readarr_api_key) - search_results = readarr.lookup_book(term=query) - return jsonify(search_results) - - -@arr_bp.route("/list_qualities/", methods=["GET"]) -def list_qualities(media_type): - if media_type == "movie": - radarr_api_key = config["APIKeys"]["radarr"] - radarr_url = config["ARRSettings"]["radarrurl"] - radarr = RadarrAPI(radarr_url, radarr_api_key) - quality_list = radarr.get_quality_profile() - - real_quality_list = [] - - for quality in quality_list: - real_quality_list.append({"id": quality["id"], "name": quality["name"]}) - - # order the list by name - real_quality_list = sorted(real_quality_list, key=lambda k: k["name"].lower()) - - return jsonify(real_quality_list) - elif media_type == "serie": - sonarr_api_key = config["APIKeys"]["sonarr"] - sonarr_url = config["ARRSettings"]["sonarrurl"] - sonarr = SonarrAPI(sonarr_url, sonarr_api_key) - quality_list = sonarr.get_quality_profile() - - real_quality_list = [] - - for quality in quality_list: - real_quality_list.append({"id": quality["id"], "name": quality["name"]}) - - # order the list by name - real_quality_list = sorted(real_quality_list, key=lambda k: k["name"].lower()) - - return jsonify(real_quality_list) - - elif media_type == "music": - lidarr_api_key = config["APIKeys"]["lidarr"] - lidarr_url = config["ARRSettings"]["lidarrurl"] - lidarr = LidarrAPI(lidarr_url, lidarr_api_key) - quality_list = lidarr.get_quality_profile() - - real_quality_list = [] - - for quality in quality_list: - real_quality_list.append({"id": quality["id"], "name": quality["name"]}) - - # order the list by name - real_quality_list = sorted(real_quality_list, key=lambda k: k["name"].lower()) - - return jsonify(real_quality_list) - - elif media_type == "book": - readarr_api_key = config["APIKeys"]["readarr"] - readarr_url = config["ARRSettings"]["readarrurl"] - readarr = ReadarrAPI(readarr_url, readarr_api_key) - quality_list = readarr.get_quality_profile() - - real_quality_list = [] - - for quality in quality_list: - real_quality_list.append({"id": quality["id"], "name": quality["name"]}) - - # order the list by name - real_quality_list = sorted(real_quality_list, key=lambda k: k["name"].lower()) - - return jsonify(real_quality_list) - - return jsonify( - [ - { - "id": 1, - "name": "There's not quality profile, you must create one in the app", - } - ] - ) - - -@arr_bp.route("/list_language_profiles/", methods=["GET"]) -def list_language_profiles(media_type): - if media_type == "serie": - sonarr_api_key = config["APIKeys"]["sonarr"] - sonarr_url = config["ARRSettings"]["sonarrurl"] - sonarr = SonarrAPI(sonarr_url, sonarr_api_key) - languages = sonarr.get_language_profile() - real_languages = [] - saved_ids = [] - for language in languages: - the_languages = language["languages"] - for the_language in the_languages: - if the_language["allowed"]: - if the_language["language"]["id"] not in saved_ids: - saved_ids.append(the_language["language"]["id"]) - real_languages.append(the_language["language"]) - return jsonify(real_languages) - return jsonify( - [ - { - "id": 1, - "name": "There's not language profile, you must create one in the app", - } - ] - ) - - -@arr_bp.route("/add_media", methods=["POST"]) -def add_media(): - media_type = request.get_json()["mediaType"] - media_id = request.get_json()["ID"] - quality_profile = request.get_json()["qualityID"] - term = request.get_json()["term"] - - if media_type == "movie": - radarr_folder = config["ARRSettings"]["radarrFolder"] - radarr_api_key = config["APIKeys"]["radarr"] - radarr_url = config["ARRSettings"]["radarrurl"] - radarr = RadarrAPI(radarr_url, radarr_api_key) - # get all quality : print(radarr.get_quality_profile()) - movie = radarr.lookup_movie(term=term)[int(media_id)] - radarr.add_movie( - movie=movie, quality_profile_id=int(quality_profile), root_dir=radarr_folder - ) - elif media_type == "serie": - language_id = request.get_json()["languageId"] - sonarr_folder = config["ARRSettings"]["sonarrFolder"] - sonarr_api_key = config["APIKeys"]["sonarr"] - sonarr_url = config["ARRSettings"]["sonarrurl"] - language_id = request.get_json()["languageId"] - sonarr = SonarrAPI(sonarr_url, sonarr_api_key) - serie = sonarr.lookup_series(term=term)[int(media_id)] - sonarr.add_series( - series=serie, - quality_profile_id=int(quality_profile), - root_dir=sonarr_folder, - language_profile_id=int(language_id), - ) - elif media_type == "music": - file_type = request.get_json()["type"] - lidarr_folder = config["ARRSettings"]["lidarrFolder"] - lidarr_api_key = config["APIKeys"]["lidarr"] - lidarr_url = config["ARRSettings"]["lidarrurl"] - lidarr = LidarrAPI(lidarr_url, lidarr_api_key) - # print(f"mediaID: {mediaID} | quality_profile: {quality_profile} | lidarrFolder: {lidarrFolder}") - if file_type == "album": - album = lidarr.lookup(term=term)[int(media_id)]["album"] - lidarr.add_album( - album=album, - quality_profile_id=int(quality_profile), - root_dir=lidarr_folder, - ) - elif file_type == "artist": - artist = lidarr.lookup(term=term)[int(media_id)] - lidarr.add_artist( - artist=artist, - quality_profile_id=int(quality_profile), - root_dir=lidarr_folder, - ) - elif media_type == "book": - readarr_folder = config["ARRSettings"]["readarrFolder"] - readarr_api_key = config["APIKeys"]["readarr"] - readarr_url = config["ARRSettings"]["readarrurl"] - readarr = ReadarrAPI(readarr_url, readarr_api_key) - - readarr.add_book( - db_id=int(media_id), - quality_profile_id=int(quality_profile), - root_dir=readarr_folder, - book_id_type="goodreads", - ) - - return jsonify({"status": "ok"}) - - -@arr_bp.route("/get_tmdb_poster", methods=["POST"]) -def get_imdb_poster(): - json_file = request.get_json() - if "imdbId" in json_file: - imdb_id = json_file["imdbId"] - find = Find() - media = find.find_by_imdb_id(imdb_id) - url = "" - if media: - try: - for movie in media["movie_results"]: - url = f"https://www.themoviedb.org/t/p/w600_and_h900_bestv2{movie['poster_path']}" - break - for serie in media["tv_results"]: - url = f"https://www.themoviedb.org/t/p/w600_and_h900_bestv2{serie['poster_path']}" - break - except Exception: - url = "/static/img/broken.webp" - return jsonify({"url": url}) - else: - return jsonify({"url": "/static/img/broken.webp"}) diff --git a/src/chocolate_app/routes/libraries.py b/src/chocolate_app/routes/libraries.py deleted file mode 100644 index f3afb89..0000000 --- a/src/chocolate_app/routes/libraries.py +++ /dev/null @@ -1,349 +0,0 @@ -import json -import natsort - -from operator import itemgetter -from flask import Blueprint, jsonify, request, abort - -import chocolate_app.scans as scans -from chocolate_app import DB, all_auth_tokens -from chocolate_app.plugins_loader import events -from chocolate_app.utils.utils import generate_log, check_authorization, check_admin - -from chocolate_app.tables import ( - Libraries, - LibrariesMerge, - Users, - Movies, - Series, - Seasons, - Episodes, - Games, - OthersVideos, -) - -libraries_bp = Blueprint("libraries", __name__) - - -@libraries_bp.route("/get_all_libraries", methods=["GET"]) -def get_all_libraries(): - token = request.headers.get("Authorization") - if token not in all_auth_tokens: - abort(401) - - user = all_auth_tokens[token]["user"] - user = Users.query.filter_by(name=user).first() - - libraries = Libraries.query.filter_by().all() - libraries_list = [library.__dict__ for library in libraries] - for library in libraries_list: - del library["_sa_instance_state"] - if user.account_type != "Admin": - for library in libraries_list: - if library["available_for"] is not None: - available_for = str(library["available_for"]).split(",") - if str(user.id) not in available_for: - libraries_list.remove(library) - - libraries = sorted(libraries_list, key=lambda k: k["name"].lower()) - libraries = sorted(libraries_list, key=lambda k: k["type"].lower()) - - for library in libraries: - child_libs = LibrariesMerge.query.filter_by(parent_lib=library["name"]).all() - if child_libs is []: - continue - child_libs_bis = [] - for child in child_libs: - if not child: - continue - child_libs.append(child.child_lib) - - for child in child_libs: - for lib in libraries: - if lib["name"] == child: - libraries.remove(lib) - - generate_log(request, "SERVER") - - return jsonify(libraries) - - -@libraries_bp.route("/get_all_libraries_created") -def get_all_libraries_created(): - token = request.headers.get("Authorization") - if token not in all_auth_tokens: - abort(401) - - user = all_auth_tokens[token]["user"] - user = Users.query.filter_by(name=user).first() - - libraries = Libraries.query.filter_by().all() - libraries_list = [library.__dict__ for library in libraries] - for library in libraries_list: - del library["_sa_instance_state"] - # check if lib already have a parent - parent = LibrariesMerge.query.filter_by(child_lib=library["name"]).first() - if parent is not None: - library["merge_parent"] = parent.parent_lib - # if lib is a parent, can't be a child - child = LibrariesMerge.query.filter_by(parent_lib=library["name"]).first() - if child is None: - library_type = library["type"] - # for all lib of the same type, remove the actual lib, and add all the lib to "possible_merge_parent" - for lib in libraries_list: - is_child = LibrariesMerge.query.filter_by(child_lib=lib["name"]).first() - if ( - lib["type"] == library_type - and lib["name"] != library["name"] - and is_child is None - ): - if "possible_merge_parent" not in library: - library["possible_merge_parent"] = [] - data = {"value": lib["name"], "text": lib["name"]} - library["possible_merge_parent"].append(data) - - if user.account_type != "Admin": - for library in libraries_list: - if library["available_for"] is not None: - available_for = str(library["available_for"]).split(",") - if str(user.id) not in available_for: - libraries_list.remove(library) - - generate_log(request, "SERVER") - - return jsonify(libraries_list) - - -@libraries_bp.route("/create_library", methods=["POST"]) -def create_lib(): - the_request = request.get_json() - the_request = json.loads(the_request) - lib_name = the_request["name"] - lib_path = the_request["path"] - lib_type = the_request["type"] - lib_users = the_request["users"] - - if lib_users == "": - lib_users = None - - icons = { - "movies": "film", - "series": "videocam", - "consoles": "game-controller", - "tv": "tv", - "others": "desktop", - "books": "book", - "musics": "headset", - } - - function_to_call = { - "movies": scans.getMovies, - "series": scans.getSeries, - "consoles": scans.getGames, - "others": scans.getOthersVideos, - "books": scans.getBooks, - "musics": scans.getMusics, - } - - lib_path = lib_path.replace("\\", "/") - - exists = Libraries.query.filter_by(name=lib_name).first() is not None - if not exists: - new_lib = Libraries( - lib_name=lib_name, - lib_folder=lib_path, - lib_type=lib_type, - lib_image=icons[lib_type], - available_for=lib_users, - ) - DB.session.add(new_lib) - DB.session.commit() - - events.execute_event(events.NEW_LIBRARY, lib_name) - - try: - function_to_call[lib_type](lib_name) - except Exception: - pass - - return jsonify({"error": "worked"}) - else: - abort(409) - - -@libraries_bp.route("/edit_library", methods=["POST"]) -def edit_lib(): - token = request.headers.get("Authorization") - if token not in all_auth_tokens: - abort(401) - - the_request = request.get_json() - default_path = the_request["default_path"] - lib_name = the_request["name"] - lib_path = the_request["path"] - lib_type = the_request["type"] - lib_users = the_request["users"] - merge_parent = the_request["merge_parent"] - - merge_libraries(merge_parent, lib_name) - - lib_path = lib_path.replace("\\", "/") - - lib = Libraries.query.filter_by(folder=default_path).first() - if lib is None: - abort(404) - - if lib_path is not None: - lib.folder = lib_path - if lib_type is not None: - lib.type = lib_type - if lib_users is not None: - if len(lib_users.split(",")) == 1: - lib_users = int(lib_users.replace('"', "")) - lib.available_for = lib_users - DB.session.commit() - return jsonify({"error": "worked"}) - - -@libraries_bp.route("/delete_library", methods=["POST"]) -def delete_lib(): - the_request = request.get_json() - - lib_name = the_request["name"] - lib = Libraries.query.filter_by(name=lib_name).first() - - if lib is None: - abort(404) - - DB.session.delete(lib) - - lib_type = lib.type - - if lib_type == "movies": - all_movies = Movies.query.filter_by(library_name=lib_name).all() - for movie in all_movies: - DB.session.delete(movie) - elif lib_type == "series": - all_series = Series.query.filter_by(library_name=lib_name).all() - for serie in all_series: - seasons = Seasons.query.filter_by(serie=serie.id).all() - for season in seasons: - episodes = Episodes.query.filter_by(season_id=season.season_id).all() - for episode in episodes: - DB.session.delete(episode) - DB.session.delete(season) - DB.session.delete(serie) - elif lib_type == "consoles": - all_games = Games.query.filter_by(library_name=lib_name).all() - for game in all_games: - DB.session.delete(game) - elif lib_type == "others": - all_other = OthersVideos.query.filter_by(library_name=lib_name).all() - for other in all_other: - DB.session.delete(other) - - DB.session.commit() - - events.execute_event(events.LIBRARY_DELETE, lib_name) - - return jsonify({"error": "worked"}) - - -@libraries_bp.route("/start_intro_detection", methods=["POST"]) -def start_intro_detection(): - from multiprocessing import Process - from chocolate_app.intro import intro_detection - - check_authorization(request, request.headers.get("Authorization")) - check_admin(request, request.headers.get("Authorization")) - - process = Process(target=intro_detection.start) - process.start() - - return jsonify(True) - - -@libraries_bp.route("/rescan_all", methods=["POST"]) -def rescan_all(): - libraries = Libraries.query.all() - libraries = [library.__dict__ for library in libraries] - - libraries = natsort.natsorted(libraries, key=itemgetter(*["name"])) - libraries = natsort.natsorted(libraries, key=itemgetter(*["type"])) - - type_to_call = { - "series": scans.getSeries, - "consoles": scans.getGames, - "others": scans.getOthersVideos, - "books": scans.getBooks, - "musics": scans.getMusics, - } - - for library in libraries: - if library["type"] in type_to_call: - type_to_call[library["type"]](library["name"]) - return jsonify(True) - - -@libraries_bp.route("/rescan/", methods=["POST"]) -def rescan(library): - exists = Libraries.query.filter_by(name=library).first() is not None - - type_to_call = { - "series": scans.getSeries, - "movies": scans.getMovies, - "consoles": scans.getGames, - "others": scans.getOthersVideos, - "books": scans.getBooks, - "musics": scans.getMusics, - } - - if exists: - library = Libraries.query.filter_by(name=library).first().__dict__ - merges = LibrariesMerge.query.filter_by(parent_lib=library["name"]).all() - for merge in merges: - child = Libraries.query.filter_by(name=merge.child_lib).first() - type_to_call[child.type](child.name) - type_to_call[library["type"]](library["name"]) - return jsonify(True) - return jsonify(False) - - -def merge_libraries(parent, child): - if not child: - return - - if not parent: - merge = LibrariesMerge.query.filter_by(child_lib=child).first() - if merge is not None: - DB.session.delete(merge) - DB.session.commit() - return - - parent = Libraries.query.filter_by(name=parent).first() - if parent is None: - return - - child = Libraries.query.filter_by(name=child).first() - if child is None: - return - - if parent.type != child.type: - return - - exist = LibrariesMerge.query.filter_by( - parent_lib=parent.name, child_lib=child.name - ).first() - # child is already a parent - is_parent = LibrariesMerge.query.filter_by(parent_lib=child.name).first() - - if exist is None and is_parent is None: - fusion = LibrariesMerge(parent_lib=parent.name, child_lib=child.name) - DB.session.add(fusion) - DB.session.commit() - elif is_parent is None: - fusion = LibrariesMerge.query.filter_by( - parent_lib=parent.name, child_lib=child.name - ).first() - DB.session.delete(fusion) - DB.session.commit() - return diff --git a/src/chocolate_app/routes/settings.py b/src/chocolate_app/routes/settings.py deleted file mode 100644 index ad46073..0000000 --- a/src/chocolate_app/routes/settings.py +++ /dev/null @@ -1,145 +0,0 @@ -from flask import Blueprint, jsonify, request - -from chocolate_app.plugins_loader import events -from chocolate_app.tables import Users, Libraries -from chocolate_app import config, write_config, tmdb - -settings_bp = Blueprint("settings", __name__) - - -@settings_bp.route("/get_settings", methods=["GET"]) -def get_settings(): - all_users = Users.query.all() - all_libraries = Libraries.query.all() - users = [] - for user in all_users: - user = user.__dict__ - del user["_sa_instance_state"] - user["password"] = "Ratio" - users.append(user) - - libs = [] - for library in all_libraries: - library = library.__dict__ - del library["_sa_instance_state"] - libs.append(library) - - data = { - "users": users, - "libraries": libs, - } - - all_sections = config.sections() - for section in all_sections: - section_data = config[section] - the_data = {} - for key in section_data: - the_data[key] = section_data[key] - data[section] = the_data - - return jsonify(data) - - -@settings_bp.route("/save_settings", methods=["GET", "POST"]) -def save_settings(): - global client_id, client_secret - body = request.get_json() - tmdb_api_key = body["tmdbKey"] - language = body["language"] - igdb_secret_key = body["igdbSecret"] - igdb_client_id = body["igdbID"] - - radarr_adress = body["radarrAdress"] - radarrfolder = body["radarrFolder"] - radarr_api_key = body["radarrAPI"] - sonarr_adress = body["sonarrAdress"] - sonarrfolder = body["sonarrFolder"] - sonarr_api_key = body["sonarrAPI"] - readarr_adress = body["readarrAdress"] - readarrfolder = body["readarrFolder"] - readarr_api_key = body["readarrAPI"] - lidarr_adress = body["lidarrAdress"] - lidarrfolder = body["lidarrFolder"] - lidarr_api_key = body["lidarrAPI"] - - if radarr_adress != "": - if radarr_adress.startswith("https://"): - radarr_adress = radarr_adress.replace("https://", "http://") - if not radarr_adress.startswith("http://"): - radarr_adress = f"http://{radarr_adress}" - config.set("ARRSettings", "radarrurl", radarr_adress) - if radarrfolder != "": - radarrfolder = radarrfolder.replace("\\", "/") - if not radarrfolder.endswith("/"): - radarrfolder = f"{radarrfolder}/" - config.set("ARRSettings", "radarrfolder", radarrfolder) - if radarr_api_key != "": - config.set("APIKeys", "radarr", radarr_api_key) - - if sonarr_adress != "": - if sonarr_adress.startswith("https://"): - sonarr_adress = sonarr_adress.replace("https://", "http://") - if not sonarr_adress.startswith("http://"): - sonarr_adress = f"http://{sonarr_adress}" - config.set("ARRSettings", "sonarrurl", sonarr_adress) - if sonarrfolder != "": - sonarrfolder = sonarrfolder.replace("\\", "/") - if not sonarrfolder.endswith("/"): - sonarrfolder = f"{sonarrfolder}/" - config.set("ARRSettings", "sonarrfolder", sonarrfolder) - if sonarr_api_key != "": - config.set("APIKeys", "sonarr", sonarr_api_key) - - if readarr_adress != "": - if readarr_adress.startswith("https://"): - readarr_adress = readarr_adress.replace("https://", "http://") - if not readarr_adress.startswith("http://"): - readarr_adress = f"http://{readarr_adress}" - config.set("ARRSettings", "readarrurl", readarr_adress) - if readarrfolder != "": - readarrfolder = readarrfolder.replace("\\", "/") - if not readarrfolder.endswith("/"): - readarrfolder = f"{readarrfolder}/" - config.set("ARRSettings", "readarrfolder", readarrfolder) - if readarr_api_key != "": - config.set("ARRSettings", "readarrurl", readarr_adress) - - if lidarr_adress != "": - if lidarr_adress.startswith("https://"): - lidarr_adress = lidarr_adress.replace("https://", "http://") - if not lidarr_adress.startswith("http://"): - lidarr_adress = f"http://{lidarr_adress}" - config.set("ARRSettings", "lidarrurl", lidarr_adress) - if lidarrfolder != "": - lidarrfolder = lidarrfolder.replace("\\", "/") - if not lidarrfolder.endswith("/"): - lidarrfolder = f"{lidarrfolder}/" - config.set("ARRSettings", "lidarrfolder", lidarrfolder) - if lidarr_api_key != "": - config.set("ARRSettings", "lidarrurl", lidarr_adress) - if tmdb_api_key != "": - config.set("APIKeys", "TMDB", tmdb_api_key) - tmdb.api_key = tmdb_api_key - if igdb_client_id != "" and igdb_secret_key != "": - config.set("APIKeys", "igdbid", igdb_client_id) - config.set("APIKeys", "igdbsecret", igdb_secret_key) - client_id = igdb_client_id - client_secret = igdb_secret_key - - if language != "undefined": - config.set("ChocolateSettings", "language", language.lower()) - - try: - allow_download = body["allowDownloadsCheckbox"] - if allow_download == "on": - config.set("ChocolateSettings", "allowdownload", "true") - else: - config.set("ChocolateSettings", "allowdownload", "false") - except Exception: - config.set("ChocolateSettings", "allowdownload", "false") - - write_config(config) - - events.execute_event(events.SETTINGS_UPDATE, config) - - return jsonify({"error": "success"}) diff --git a/src/chocolate_app/routes/users.py b/src/chocolate_app/routes/users.py deleted file mode 100644 index 1da1a62..0000000 --- a/src/chocolate_app/routes/users.py +++ /dev/null @@ -1,296 +0,0 @@ -import os -import time -import base64 -import io - -from PIL import Image -from flask import Blueprint, Response, jsonify, request, abort, send_file -from werkzeug.security import generate_password_hash - -from chocolate_app import DB, get_dir_path, all_auth_tokens, IMAGES_PATH -from chocolate_app.tables import Users, InviteCodes -from chocolate_app.utils.utils import check_authorization, generate_log -from chocolate_app.plugins_loader.events import Events, execute_event - -dir_path = get_dir_path() -users_bp = Blueprint("users", __name__) - -@users_bp.route("/get_all_users", methods=["GET"]) -def get_all_users(): - all_users = Users.query.filter().all() - all_users_list = [] - for user in all_users: - profile_picture = user.profile_picture - if not os.path.exists(dir_path + profile_picture): - profile_picture = "/static/img/avatars/defaultUserProfilePic.png" - user_dict = { - "name": user.name, - "profile_picture": profile_picture, - "account_type": user.account_type, - "password_empty": True if not user.password else False, - "id": user.id, - } - all_users_list.append(user_dict) - return jsonify(all_users_list) - -@users_bp.route("/profile_picture/", methods=["GET"]) -def get_profile_picture(id): - user = Users.query.filter_by(id=id).first() - if not user: - return send_file("static/img/avatars/defaultUserProfilePic.png") - - profile_picture = user.profile_picture - if not os.path.exists(profile_picture): - profile_picture = "static/img/avatars/defaultUserProfilePic.png" - return send_file(profile_picture) - -@users_bp.route("/login", methods=["POST"]) -def login(): - from uuid import uuid4 - - auth_token = str(uuid4()) - account_name = "" - if "name" not in request.get_json() and not "username" in request.get_json(): - abort(400, "Missing name or username") - elif "name" not in request.get_json(): - account_name = request.get_json()["username"] - else: - account_name = request.get_json()["name"] - - account_password = request.get_json()["password"] - user = Users.query.filter_by(name=account_name).first() - token = f"Bearer {auth_token}" - actual_time_in_seconds = int(time.time()) - all_auth_tokens[token] = {"user": account_name, "time": actual_time_in_seconds} - if user: - user_object = user.__dict__ - del user_object["_sa_instance_state"] - - if user.account_type == "Kid": - generate_log(request, "LOGIN") - execute_event(Events.LOGIN, user) - return jsonify( - {"id": user.id, "name": user.name, "error": "None", "token": auth_token, "user": user_object} - ) - elif user.verify_password(account_password): - generate_log(request, "LOGIN") - execute_event(Events.LOGIN, user) - return jsonify( - {"id": user.id, "name": user.name, "error": "None", "token": auth_token, "user": user_object} - ) - else: - generate_log(request, "ERROR") - user = user.__dict__ - user["error"] = "Unauthorized" - execute_event(Events.LOGIN, user) - return jsonify({"error": "Unauthorized"}) - else: - generate_log(request, "ERROR") - user = user.__dict__ - user["error"] = "Unauthorized" - execute_event(Events.LOGIN, user) - return jsonify({"error": "Unauthorized"}) - - -@users_bp.route("/create_account", methods=["POST"]) -def create_account(): - body = request.get_json() - account_name = body["username"] - account_password = body["password"] - account_type_input = body["type"] - - profile_picture = f"{IMAGES_PATH}/avatars/{account_name}.webp" - if "profile_picture" not in body: - profile_picture = "/static/img/avatars/defaultUserProfilePic.png" - else: - file_base64 = body["profile_picture"] - if file_base64.startswith("data:image"): - file_base64 = file_base64.split(",", 1)[1] - - full_path = profile_picture - - image_data = base64.b64decode(file_base64) - - # Lire l'image à partir des bytes - image = Image.open(io.BytesIO(image_data)) - - # Déterminer le format de l'image - image_format = image.format.lower() - - # Convertir l'image en format WebP si nécessaire - if image_format != "webp": - output_buffer = io.BytesIO() - image.save(output_buffer, "WEBP") - output_buffer.seek(0) - image = Image.open(output_buffer) - - # Enregistrer l'image au format WebP - image.save(full_path, "WEBP") - image.close() - - user_exists = Users.query.filter_by(name=account_name).first() - nb_users = len(Users.query.filter().all()) - if user_exists: - abort(409) - account_type_input = account_type_input.lower() - account_type_input = account_type_input.capitalize() - - if nb_users == 0: - account_type_input = "Admin" - - new_user = Users( - name=account_name, - password=account_password, - profile_picture=profile_picture, - account_type=account_type_input, - ) - DB.session.add(new_user) - DB.session.commit() - return jsonify( - { - "id": new_user.id, - "name": new_user.name, - } - ) - - -@users_bp.route("/edit_profil", methods=["POST"]) -def edit_profil() -> Response: - authorization = request.headers.get("Authorization") - - if authorization not in all_auth_tokens: - abort(401, "Unauthorized") - - user = Users.query.filter_by(name=all_auth_tokens[authorization]["user"]).first() - - body = request.get_json() - - user_name = body["name"] - password = body["password"] - - db_user = Users.query.filter_by(name=user_name).first() - - if not db_user or str(db_user.id) != str(user.id): - abort(500, "You are not allowed to change the username of this user") - - type = None - if "type" in body: - type = body["type"] - id = body["id"] - - if str(id) != str(user.id) and user.account_type != "Admin": - abort(401, "Unauthorized") - - username_in_tokens = all_auth_tokens[authorization]["user"] - user = Users.query.filter_by(name=username_in_tokens).first() - try: - f = request.files["image"] - name, extension = os.path.splitext(f.filename or "") - profile_picture = f"/static/img/{user_name}{extension}" - if extension == "": - profile_picture = "/static/img/avatars/defaultUserProfilePic.png" - except Exception: - profile_picture = "/static/img/avatars/defaultUserProfilePic.png" - - user_to_edit = Users.query.filter_by(id=id).first() - - if user_to_edit.name != user_name: - user_to_edit.name = user_name - - if type and user_to_edit.account_type != type: - user_to_edit.account_type = type - - if user_to_edit.password != generate_password_hash(password) and len(password) > 0: - user_to_edit.password = generate_password_hash(password) - - if password == "": - user_to_edit.password = None - if ( - user_to_edit.profile_picture != profile_picture - and "/static/img/avatars/defaultUserProfilePic.png" not in profile_picture - ): - f = request.files["profile_picture"] - f.save(f"{dir_path}{profile_picture}") - user_to_edit.profile_picture = profile_picture - - DB.session.commit() - - return jsonify( - { - "id": user_to_edit.id, - "name": user_to_edit.name, - } - ) - - -@users_bp.route("/delete_account", methods=["POST"]) -def delete_account() -> Response: - authorization = request.headers.get("Authorization") - check_authorization(request, authorization) - body = request.get_json() - if not body: - abort(400, "Missing body") - if "id" not in body: - abort(400, "Missing id") - id = body["id"] - - user = Users.query.filter_by(id=id).first() - DB.session.delete(user) - DB.session.commit() - - execute_event(Events.USER_DELETE, user) - - return jsonify( - { - "id": user.id, - "name": user.name, - } - ) - - -@users_bp.route("/get_profil/") -def get_profil(id: int) -> Response: - user = Users.query.filter_by(id=id).first() - profile_picture = user.profile_picture - if not os.path.exists(profile_picture): - profile_picture = "/static/img/avatars/defaultUserProfilePic.png" - user_dict = { - "name": user.name, - "profile_picture": profile_picture, - "account_type": user.account_type, - } - return jsonify(user_dict) - - -@users_bp.route("/is_admin", methods=["GET"]) -def is_admin() -> Response: - authorization = request.headers.get("Authorization") - check_authorization(request, authorization) - user = Users.query.filter_by(name=all_auth_tokens[authorization]["user"]).first() - if user.account_type == "Admin": - return jsonify(True) - else: - return jsonify(False) - - -@users_bp.route("/invite_exist/", methods=["GET"]) -def invite_exist(hash: str) -> Response: - can = InviteCodes.query.filter_by(code=hash).first() is not None - return jsonify(can) - - -@users_bp.route("/create_invite", methods=["POST"]) -def create_invite() -> Response: - authorization = request.headers.get("Authorization") - check_authorization(request, authorization) - user = Users.query.filter_by(name=all_auth_tokens[authorization]["user"]).first() - - if user.account_type != "Admin": - abort(401, "Unauthorized") - - body = request.get_json() - code = body["code"] - new_invite = InviteCodes(code=code) - DB.session.add(new_invite) - DB.session.commit() - return jsonify({"code": code})