import DBcm from mariadb import ProgrammingError import pandas as pd import sqlite3 import urllib.request import json import re import ssl import imdb_suche from werkzeug.security import generate_password_hash, check_password_hash from flask_login import UserMixin def get_movie_details(imdb_url): """ Ruft eine IMDb-Filmseite auf und extrahiert den Regisseur und die Hauptdarsteller. Args: imdb_url (str): Die vollständige URL zur IMDb-Filmseite. Returns: dict: Ein Wörterbuch mit "director" und "actors" oder None bei einem Fehler. """ if not re.match(r"https://www.imdb.com/de/title/tt\d+/?", imdb_url): print(f"Fehler: Ungültige IMDb-URL: {imdb_url}. Die URL sollte so aussehen: https://www.imdb.com/de/title/tt...") return None try: # SSL-Kontext erstellen, um Zertifikatsüberprüfungsfehler zu umgehen context = ssl._create_unverified_context() # User-Agent setzen, um wie ein Browser auszusehen und Blockaden zu vermeiden headers = { 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'} req = urllib.request.Request(imdb_url, headers=headers) with urllib.request.urlopen(req, context=context) as response: html_content = response.read().decode('utf-8') # Suchen Sie nach dem strukturierten Datenblock (JSON-LD), der Filminformationen enthält match = re.search( r'', html_content, re.DOTALL) if not match: print("Fehler: Konnte den JSON-LD-Datenblock auf der Seite nicht finden.") return None json_data = json.loads(match.group(1)) # Extrahieren des Regisseurs director = [d.get('name') for d in json_data.get("director", [])] if isinstance( json_data.get("director"), list) else [json_data.get("director", {}).get("name")] # Extrahieren der Schauspieler actors = [a.get('name') for a in json_data.get("actor", [])] # Extrahieren des Erscheinungsjahres release_year = json_data.get("datePublished", "").split("-")[0] # Extrahieren des Genres genre = json_data.get("genre", []) if isinstance(genre, list): genre = ", ".join(genre) return { "director": [d for d in director if d], "actors": [a for a in actors if a], "release_year": release_year, "genre": genre } except urllib.error.HTTPError as e: print(f"HTTP-Fehler beim Abrufen der URL: {e.code} {e.reason}") return None except Exception as e: print(f"Ein Fehler ist aufgetreten: {e}") return None def create_movie_database(db_name="movie_db.db"): create_user_table = """ CREATE TABLE IF NOT EXISTS user ( id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT, username VARCHAR(64) NOT NULL UNIQUE, password_hash VARCHAR(128) NOT NULL ); """ create_my_list = """ create table if not exists movie_list ( id integer not null primary key autoincrement, titel varchar(64) not null, genre_id integer not null, regie_id integer, medium_id integer not null, release_year integer, user_id integer not null, FOREIGN KEY (user_id) REFERENCES user (id), UNIQUE(titel, user_id) ) """ create_genre = """ create table if not exists genre ( id integer not null primary key autoincrement, name varchar(64) not null ) """ create_regie = """ create table if not exists regie ( id integer not null primary key autoincrement, surname varchar(64) not null, lastname varchar(64) not null ) """ create_medium = """ create table if not exists medium ( id integer not null primary key autoincrement, medium varchar(64) not null ) """ ADD_GENRE_VALUE = """ INSERT INTO genre(name) SELECT ? WHERE NOT EXISTS (SELECT 1 FROM genre WHERE name = ?); """ ADD_MEDIUM_VALUE = """ INSERT INTO medium(medium) SELECT ? WHERE NOT EXISTS (SELECT 1 FROM medium WHERE medium = ?); """ ADD_REGIE_VALUE = """ INSERT INTO regie (surname, lastname ) SELECT ?, ? WHERE NOT EXISTS (SELECT surname, lastname FROM regie WHERE surname = ? AND lastname = ?); """ with DBcm.UseDatabase(db_name) as db: db.execute(create_user_table) db.execute(create_my_list) db.execute(create_genre) db.execute(create_regie) db.execute(create_medium) with open("genre_list", "r") as fs: for genre_value in fs.readlines(): with DBcm.UseDatabase(db_name) as db: db.execute(ADD_GENRE_VALUE, (genre_value.strip(), genre_value.strip())) usecols = ["Name", "Vorname"] air = pd.read_csv("regie_name.csv", usecols=usecols) for count, (reg_name, reg_vorname) in enumerate(zip(air["Name"], air["Vorname"])): # print(count, reg_vorname, reg_name) with DBcm.UseDatabase(db_name) as db: db.execute(ADD_REGIE_VALUE, (reg_vorname, reg_name, reg_vorname, reg_name)) LISTE_MEDIUM = ["BlueRay", "DVD", "Datei", "BlueRay Steelbook", "DVD Steelbook"] with DBcm.UseDatabase(db_name) as db: for MEDIUM in LISTE_MEDIUM: db.execute(ADD_MEDIUM_VALUE, (MEDIUM, MEDIUM)) def all_select(db_name="movie_db.db", what_select="genre"): ALL_SELECT = "SELECT * from " + what_select if what_select == "genre" or what_select == "medium": with DBcm.UseDatabase(db_name) as db: db.execute(ALL_SELECT) all_value = [i[1] for i in db.fetchall()] return all_value elif what_select == 'regie': all_value = [] with DBcm.UseDatabase(db_name) as db: db.execute(ALL_SELECT) for i in db.fetchall(): all_value.append(i[1] + " " + i[2]) return all_value else: return "Wrong Value !!!" def get_or_create_regie(director_name: str, db_name: str = "movie_db.db"): ADD_REGIE_VALUE = """ INSERT INTO regie (surname, lastname ) SELECT ?, ? WHERE NOT EXISTS (SELECT surname, lastname FROM regie WHERE surname = ? AND lastname = ?); """ director_split = director_name.split(" ") surname = director_split[0] lastname = " ".join(director_split[1:]) with DBcm.UseDatabase(db_name) as db: db.execute(ADD_REGIE_VALUE, (surname, lastname, surname, lastname)) db.execute("SELECT id FROM regie WHERE surname = ? AND lastname = ?", (surname, lastname)) regie_id = db.fetchone()[0] return regie_id def get_or_create_genre(genre_name: str, db_name: str = "movie_db.db"): ADD_GENRE_VALUE = """ INSERT INTO genre(name) SELECT ? WHERE NOT EXISTS (SELECT 1 FROM genre WHERE name = ?); """ with DBcm.UseDatabase(db_name) as db: db.execute(ADD_GENRE_VALUE, (genre_name, genre_name)) db.execute("SELECT id FROM genre WHERE name = ?", (genre_name,)) genre_id = db.fetchone()[0] return genre_id def search_id(search_name: str, select_from: str = "genre", select_where: str = "name", db_name: str = "movie_db.db"): if select_from == "regie": split_search = search_name.split(" ") GENRE_QUERY = f"""select id from {select_from} where surname = ? and lastname = ?""" with DBcm.UseDatabase(db_name) as db: db.execute(GENRE_QUERY, (split_search[0], split_search[1],)) regie_id = db.fetchone()[0] return int(regie_id) else: try: GENRE_QUERY = f"""select id from {select_from} where {select_where} = ?""" with DBcm.UseDatabase(db_name) as db: db.execute(GENRE_QUERY, (search_name,)) genre_id = db.fetchone()[0] return int(genre_id) except: return int(0) def scrape_and_add_movie(movie_name: str, medium_id: int, user_id: int, db_name: str = "movie_db.db"): url = imdb_suche.suche_film_url(movie_name) if not url: return "not_found_imdb" details = get_movie_details(url + "/") if not details: return "not_found_imdb" regie_id = None if details.get("director"): regie_id = get_or_create_regie(director_name=details["director"][0]) release_year = details.get("release_year") genre_name = details.get("genre") genre_id = None if genre_name: genre_id = get_or_create_genre(genre_name=genre_name) SQL_PARAM = f""" INSERT INTO movie_list (titel, genre_id, regie_id, medium_id, release_year, user_id) VALUES (?, ?, ?, ?, ?, ?); """ try: with DBcm.UseDatabase(db_name) as db: db.execute(SQL_PARAM, (movie_name.lower(), genre_id, regie_id, medium_id, release_year, user_id,)) except sqlite3.IntegrityError as e: print(f"Error adding movie (duplicate): {e}") return "duplicate" except ProgrammingError as e: print(f"Error adding movie: {e}") return "error" except Exception as e: print(f"An unexpected error occurred: {e}") return "error" return "success" def add_manual_movie(movie_name: str, medium_id: int, director_name: str, release_year: int, genre_name: str, user_id: int, db_name: str = "movie_db.db"): regie_id = get_or_create_regie(director_name=director_name) genre_id = get_or_create_genre(genre_name=genre_name) SQL_PARAM = f""" INSERT INTO movie_list (titel, genre_id, regie_id, medium_id, release_year, user_id) VALUES (?, ?, ?, ?, ?, ?); """ try: with DBcm.UseDatabase(db_name) as db: db.execute(SQL_PARAM, (movie_name.lower(), genre_id, regie_id, medium_id, release_year, user_id,)) except (ProgrammingError, sqlite3.IntegrityError) as e: print(f"Error adding movie: {e}") return False return True def show_movie_list(db_name="movie_db.db", user_id=None, search_query=None, page=1, per_page=20): """Zeigt eine paginierte Liste von Filmen an, die nach einem Suchbegriff gefiltert werden können.""" # Basis-SQL-Abfrage SQL_BASE = """FROM movie_list INNER JOIN genre ON movie_list.genre_id=genre.id INNER JOIN regie ON movie_list.regie_id=regie.id INNER JOIN medium ON movie_list.medium_id=medium.id""" # Filter-Bedingungen params = [] where_clauses = [] if user_id: where_clauses.append("movie_list.user_id = ?") params.append(user_id) if search_query: search_param = f"%{search_query}%" where_clauses.append("(titel LIKE ? OR genre.name LIKE ? OR (regie.surname || ' ' || regie.lastname) LIKE ?)") params.extend([search_param, search_param, search_param]) SQL_WHERE = "" if where_clauses: SQL_WHERE = " WHERE " + " AND ".join(where_clauses) # Verbindung zur Datenbank db = sqlite3.connect(db_name) # Gesamtzahl der Filme für die Paginierung ermitteln COUNT_SQL = f"SELECT COUNT(movie_list.id) {SQL_BASE}{SQL_WHERE}" total_movies = db.execute(COUNT_SQL, params).fetchone()[0] total_pages = (total_movies + per_page - 1) // per_page # Filme für die aktuelle Seite abrufen offset = (page - 1) * per_page SELECT_SQL = f"""SELECT movie_list.id, titel, genre.name AS genre, regie.surname as regie_surname, regie.lastname as regie_lastname, medium.medium, movie_list.release_year {SQL_BASE}{SQL_WHERE} ORDER BY titel ASC LIMIT ? OFFSET ?""" paginated_params = params + [per_page, offset] SELCET_VALUE = pd.read_sql(SELECT_SQL, db, params=paginated_params) # Ergebnis in eine Liste von Dictionaries umwandeln return_list_dict = [] for id, titel, genre, regie_surname, regie_lastname, medium, release_year in zip(SELCET_VALUE["id"], SELCET_VALUE["titel"], SELCET_VALUE["genre"], SELCET_VALUE["regie_surname"], SELCET_VALUE["regie_lastname"], SELCET_VALUE["medium"], SELCET_VALUE["release_year"]): return_list_dict.append( {"id": id, "titel": titel, "genre": genre, "regie": regie_surname + " " + regie_lastname, "medium": medium, "release_year": release_year}) return return_list_dict, total_pages def get_movie_by_title(movie_title: str, db_name: str = "movie_db.db"): SQL_PARAM = f"""SELECT movie_list.id, titel, genre.name AS genre, regie.surname as regie_surname, regie.lastname as regie_lastname, medium.medium, movie_list.release_year FROM movie_list INNER JOIN genre ON movie_list.genre_id=genre.id INNER JOIN regie ON movie_list.regie_id=regie.id INNER JOIN medium ON movie_list.medium_id=medium.id WHERE movie_list.titel = ? ; """ with DBcm.UseDatabase(db_name) as db: db.execute(SQL_PARAM, (movie_title.lower(),)) movie = db.fetchone() if movie: return {"id": movie[0], "titel": movie[1], "genre": movie[2], "regie": movie[3] + " " + movie[4], "medium": movie[5], "release_year": movie[6]} return None class User(UserMixin): def __init__(self, id, username, password_hash): self.id = id self.username = username self.password_hash = password_hash def set_password(self, password): self.password_hash = generate_password_hash(password) def check_password(self, password): return check_password_hash(self.password_hash, password) def add_user(username, password, db_name="movie_db.db"): password_hash = generate_password_hash(password) SQL = "INSERT INTO user (username, password_hash) VALUES (?, ?)" with DBcm.UseDatabase(db_name) as db: db.execute(SQL, (username, password_hash)) def get_user_by_username(username, db_name="movie_db.db"): SQL = "SELECT * FROM user WHERE username = ?" with DBcm.UseDatabase(db_name) as db: db.execute(SQL, (username,)) user_data = db.fetchone() if user_data: return User(id=user_data[0], username=user_data[1], password_hash=user_data[2]) return None def get_user_by_id(user_id, db_name="movie_db.db"): SQL = "SELECT * FROM user WHERE id = ?" with DBcm.UseDatabase(db_name) as db: db.execute(SQL, (user_id,)) user_data = db.fetchone() if user_data: return User(id=user_data[0], username=user_data[1], password_hash=user_data[2]) return None def delete_movie_by_id(movie_id: int, db_name: str = "movie_db.db"): SQL = "DELETE FROM movie_list WHERE id = ?" with DBcm.UseDatabase(db_name) as db: db.execute(SQL, (movie_id,)) return db.rowcount > 0 if __name__ == "__main__": create_movie_database() # print(all_select()) # id_genre = search_id( # search_name="DVD", select_from="medium", select_where="medium") scrape_and_add_movie(movie_name="Die unglaubliche Reise in einem verrückten Flugzeug", medium_id=1) for test in show_movie_list(): print(test.get("id"), test.get("medium"), test.get("release_year"))