Files
MyMovieDB/moviedb_func.py
jonnybravo ac32152725
All checks were successful
Deploy to ras-dan-01 / deploy (push) Successful in -22s
try
2025-08-26 16:54:54 +02:00

435 lines
15 KiB
Python

import DBcm
from mariadb import ProgrammingError
import pandas as pd
import sqlite3
import urllib.request
import json
import re
import ssl
import imdb_suche
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
def get_movie_details(imdb_url):
"""
Ruft eine IMDb-Filmseite auf und extrahiert den Regisseur und die Hauptdarsteller.
Args:
imdb_url (str): Die vollständige URL zur IMDb-Filmseite.
Returns:
dict: Ein Wörterbuch mit "director" und "actors" oder None bei einem Fehler.
"""
if not re.match(r"https://www.imdb.com/de/title/tt\d+/?", imdb_url):
print(f"Fehler: Ungültige IMDb-URL: {imdb_url}. Die URL sollte so aussehen: https://www.imdb.com/de/title/tt...")
return None
try:
# SSL-Kontext erstellen, um Zertifikatsüberprüfungsfehler zu umgehen
context = ssl._create_unverified_context()
# User-Agent setzen, um wie ein Browser auszusehen und Blockaden zu vermeiden
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'}
req = urllib.request.Request(imdb_url, headers=headers)
with urllib.request.urlopen(req, context=context) as response:
html_content = response.read().decode('utf-8')
# Suchen Sie nach dem strukturierten Datenblock (JSON-LD), der Filminformationen enthält
match = re.search(
r'<script type="application/ld\+json">(.*?)</script>', html_content, re.DOTALL)
if not match:
print("Fehler: Konnte den JSON-LD-Datenblock auf der Seite nicht finden.")
return None
json_data = json.loads(match.group(1))
# Extrahieren des Regisseurs
director = [d.get('name') for d in json_data.get("director", [])] if isinstance(
json_data.get("director"), list) else [json_data.get("director", {}).get("name")]
# Extrahieren der Schauspieler
actors = [a.get('name') for a in json_data.get("actor", [])]
# Extrahieren des Erscheinungsjahres
release_year = json_data.get("datePublished", "").split("-")[0]
# Extrahieren des Genres
genre = json_data.get("genre", [])
if isinstance(genre, list):
genre = ", ".join(genre)
return {
"director": [d for d in director if d],
"actors": [a for a in actors if a],
"release_year": release_year,
"genre": genre
}
except urllib.error.HTTPError as e:
print(f"HTTP-Fehler beim Abrufen der URL: {e.code} {e.reason}")
return None
except Exception as e:
print(f"Ein Fehler ist aufgetreten: {e}")
return None
def create_movie_database(db_name="movie_db.db"):
create_user_table = """
CREATE TABLE IF NOT EXISTS user (
id INTEGER NOT NULL PRIMARY KEY AUTOINCREMENT,
username VARCHAR(64) NOT NULL UNIQUE,
password_hash VARCHAR(128) NOT NULL
);
"""
create_my_list = """
create table if not exists movie_list (
id integer not null primary key autoincrement,
titel varchar(64) not null,
genre_id integer not null,
regie_id integer,
medium_id integer not null,
release_year integer,
user_id integer not null,
FOREIGN KEY (user_id) REFERENCES user (id),
UNIQUE(titel, user_id)
)
"""
create_genre = """
create table if not exists genre (
id integer not null primary key autoincrement,
name varchar(64) not null
)
"""
create_regie = """
create table if not exists regie (
id integer not null primary key autoincrement,
surname varchar(64) not null,
lastname varchar(64) not null
)
"""
create_medium = """
create table if not exists medium (
id integer not null primary key autoincrement,
medium varchar(64) not null
)
"""
ADD_GENRE_VALUE = """
INSERT INTO genre(name)
SELECT ?
WHERE NOT EXISTS (SELECT 1 FROM genre WHERE name = ?);
"""
ADD_MEDIUM_VALUE = """
INSERT INTO medium(medium)
SELECT ?
WHERE NOT EXISTS (SELECT 1 FROM medium WHERE medium = ?);
"""
ADD_REGIE_VALUE = """
INSERT INTO regie (surname, lastname )
SELECT ?, ?
WHERE NOT EXISTS
(SELECT surname, lastname
FROM regie
WHERE surname = ? AND lastname = ?);
"""
with DBcm.UseDatabase(db_name) as db:
db.execute(create_user_table)
db.execute(create_my_list)
db.execute(create_genre)
db.execute(create_regie)
db.execute(create_medium)
with open("genre_list", "r") as fs:
for genre_value in fs.readlines():
with DBcm.UseDatabase(db_name) as db:
db.execute(ADD_GENRE_VALUE,
(genre_value.strip(), genre_value.strip()))
usecols = ["Name", "Vorname"]
air = pd.read_csv("regie_name.csv", usecols=usecols)
for count, (reg_name, reg_vorname) in enumerate(zip(air["Name"], air["Vorname"])):
# print(count, reg_vorname, reg_name)
with DBcm.UseDatabase(db_name) as db:
db.execute(ADD_REGIE_VALUE, (reg_vorname,
reg_name, reg_vorname, reg_name))
LISTE_MEDIUM = ["BlueRay", "DVD", "Datei",
"BlueRay Steelbook", "DVD Steelbook"]
with DBcm.UseDatabase(db_name) as db:
for MEDIUM in LISTE_MEDIUM:
db.execute(ADD_MEDIUM_VALUE, (MEDIUM, MEDIUM))
def all_select(db_name="movie_db.db", what_select="genre"):
ALL_SELECT = "SELECT * from " + what_select
if what_select == "genre" or what_select == "medium":
with DBcm.UseDatabase(db_name) as db:
db.execute(ALL_SELECT)
all_value = [i[1] for i in db.fetchall()]
return all_value
elif what_select == 'regie':
all_value = []
with DBcm.UseDatabase(db_name) as db:
db.execute(ALL_SELECT)
for i in db.fetchall():
all_value.append(i[1] + " " + i[2])
return all_value
else:
return "Wrong Value !!!"
def get_or_create_regie(director_name: str, db_name: str = "movie_db.db"):
ADD_REGIE_VALUE = """
INSERT INTO regie (surname, lastname )
SELECT ?, ?
WHERE NOT EXISTS
(SELECT surname, lastname
FROM regie
WHERE surname = ? AND lastname = ?);
"""
director_split = director_name.split(" ")
surname = director_split[0]
lastname = " ".join(director_split[1:])
with DBcm.UseDatabase(db_name) as db:
db.execute(ADD_REGIE_VALUE, (surname, lastname, surname, lastname))
db.execute("SELECT id FROM regie WHERE surname = ? AND lastname = ?", (surname, lastname))
regie_id = db.fetchone()[0]
return regie_id
def get_or_create_genre(genre_name: str, db_name: str = "movie_db.db"):
ADD_GENRE_VALUE = """
INSERT INTO genre(name)
SELECT ?
WHERE NOT EXISTS (SELECT 1 FROM genre WHERE name = ?);
"""
with DBcm.UseDatabase(db_name) as db:
db.execute(ADD_GENRE_VALUE, (genre_name, genre_name))
db.execute("SELECT id FROM genre WHERE name = ?", (genre_name,))
genre_id = db.fetchone()[0]
return genre_id
def search_id(search_name: str, select_from: str = "genre", select_where: str = "name", db_name: str = "movie_db.db"):
if select_from == "regie":
split_search = search_name.split(" ")
GENRE_QUERY = f"""select id from {select_from}
where surname = ? and lastname = ?"""
with DBcm.UseDatabase(db_name) as db:
db.execute(GENRE_QUERY, (split_search[0], split_search[1],))
regie_id = db.fetchone()[0]
return int(regie_id)
else:
try:
GENRE_QUERY = f"""select id from {select_from}
where {select_where} = ?"""
with DBcm.UseDatabase(db_name) as db:
db.execute(GENRE_QUERY, (search_name,))
genre_id = db.fetchone()[0]
return int(genre_id)
except:
return int(0)
def scrape_and_add_movie(movie_name: str, medium_id: int, user_id: int, db_name: str = "movie_db.db"):
url = imdb_suche.suche_film_url(movie_name)
if not url:
return False
details = get_movie_details(url + "/")
if not details:
return False
regie_id = None
if details.get("director"):
regie_id = get_or_create_regie(director_name=details["director"][0])
release_year = details.get("release_year")
genre_name = details.get("genre")
genre_id = None
if genre_name:
genre_id = get_or_create_genre(genre_name=genre_name)
SQL_PARAM = f"""
INSERT INTO movie_list (titel, genre_id, regie_id, medium_id, release_year, user_id)
VALUES (?, ?, ?, ?, ?, ?);
"""
try:
with DBcm.UseDatabase(db_name) as db:
db.execute(SQL_PARAM, (movie_name.lower(), genre_id,
regie_id, medium_id, release_year, user_id,))
except (ProgrammingError, sqlite3.IntegrityError) as e:
print(f"Error adding movie: {e}")
return False
return True
def add_manual_movie(movie_name: str, medium_id: int, director_name: str, release_year: int, genre_name: str, user_id: int, db_name: str = "movie_db.db"):
regie_id = get_or_create_regie(director_name=director_name)
genre_id = get_or_create_genre(genre_name=genre_name)
SQL_PARAM = f"""
INSERT INTO movie_list (titel, genre_id, regie_id, medium_id, release_year, user_id)
VALUES (?, ?, ?, ?, ?, ?);
"""
try:
with DBcm.UseDatabase(db_name) as db:
db.execute(SQL_PARAM, (movie_name.lower(), genre_id,
regie_id, medium_id, release_year, user_id,))
except (ProgrammingError, sqlite3.IntegrityError) as e:
print(f"Error adding movie: {e}")
return False
return True
def show_movie_list(db_name="movie_db.db", user_id=None, search_query=None, page=1, per_page=20):
"""Zeigt eine paginierte Liste von Filmen an, die nach einem Suchbegriff gefiltert werden können."""
# Basis-SQL-Abfrage
SQL_BASE = """FROM movie_list
INNER JOIN genre ON movie_list.genre_id=genre.id
INNER JOIN regie ON movie_list.regie_id=regie.id
INNER JOIN medium ON movie_list.medium_id=medium.id"""
# Filter-Bedingungen
params = []
where_clauses = []
if user_id:
where_clauses.append("movie_list.user_id = ?")
params.append(user_id)
if search_query:
search_param = f"%{search_query}%"
where_clauses.append("(titel LIKE ? OR genre.name LIKE ? OR (regie.surname || ' ' || regie.lastname) LIKE ?)")
params.extend([search_param, search_param, search_param])
SQL_WHERE = ""
if where_clauses:
SQL_WHERE = " WHERE " + " AND ".join(where_clauses)
# Verbindung zur Datenbank
db = sqlite3.connect(db_name)
# Gesamtzahl der Filme für die Paginierung ermitteln
COUNT_SQL = f"SELECT COUNT(movie_list.id) {SQL_BASE}{SQL_WHERE}"
total_movies = db.execute(COUNT_SQL, params).fetchone()[0]
total_pages = (total_movies + per_page - 1) // per_page
# Filme für die aktuelle Seite abrufen
offset = (page - 1) * per_page
SELECT_SQL = f"""SELECT
movie_list.id,
titel,
genre.name AS genre,
regie.surname as regie_surname,
regie.lastname as regie_lastname,
medium.medium,
movie_list.release_year
{SQL_BASE}{SQL_WHERE}
LIMIT ? OFFSET ?"""
paginated_params = params + [per_page, offset]
SELCET_VALUE = pd.read_sql(SELECT_SQL, db, params=paginated_params)
# Ergebnis in eine Liste von Dictionaries umwandeln
return_list_dict = []
for id, titel, genre, regie_surname, regie_lastname, medium, release_year in zip(SELCET_VALUE["id"], SELCET_VALUE["titel"], SELCET_VALUE["genre"], SELCET_VALUE["regie_surname"], SELCET_VALUE["regie_lastname"], SELCET_VALUE["medium"], SELCET_VALUE["release_year"]):
return_list_dict.append(
{"id": id, "titel": titel, "genre": genre, "regie": regie_surname + " " + regie_lastname, "medium": medium, "release_year": release_year})
return return_list_dict, total_pages
def get_movie_by_title(movie_title: str, db_name: str = "movie_db.db"):
SQL_PARAM = f"""SELECT
movie_list.id,
titel,
genre.name AS genre,
regie.surname as regie_surname,
regie.lastname as regie_lastname,
medium.medium,
movie_list.release_year
FROM movie_list
INNER JOIN genre ON movie_list.genre_id=genre.id
INNER JOIN regie ON movie_list.regie_id=regie.id
INNER JOIN medium ON movie_list.medium_id=medium.id
WHERE movie_list.titel = ?
;
"""
with DBcm.UseDatabase(db_name) as db:
db.execute(SQL_PARAM, (movie_title.lower(),))
movie = db.fetchone()
if movie:
return {"id": movie[0], "titel": movie[1], "genre": movie[2], "regie": movie[3] + " " + movie[4], "medium": movie[5], "release_year": movie[6]}
return None
class User(UserMixin):
def __init__(self, id, username, password_hash):
self.id = id
self.username = username
self.password_hash = password_hash
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def add_user(username, password, db_name="movie_db.db"):
password_hash = generate_password_hash(password)
SQL = "INSERT INTO user (username, password_hash) VALUES (?, ?)"
with DBcm.UseDatabase(db_name) as db:
db.execute(SQL, (username, password_hash))
def get_user_by_username(username, db_name="movie_db.db"):
SQL = "SELECT * FROM user WHERE username = ?"
with DBcm.UseDatabase(db_name) as db:
db.execute(SQL, (username,))
user_data = db.fetchone()
if user_data:
return User(id=user_data[0], username=user_data[1], password_hash=user_data[2])
return None
def get_user_by_id(user_id, db_name="movie_db.db"):
SQL = "SELECT * FROM user WHERE id = ?"
with DBcm.UseDatabase(db_name) as db:
db.execute(SQL, (user_id,))
user_data = db.fetchone()
if user_data:
return User(id=user_data[0], username=user_data[1], password_hash=user_data[2])
return None
def delete_movie_by_id(movie_id: int, db_name: str = "movie_db.db"):
SQL = "DELETE FROM movie_list WHERE id = ?"
with DBcm.UseDatabase(db_name) as db:
db.execute(SQL, (movie_id,))
return db.rowcount > 0
if __name__ == "__main__":
create_movie_database()
# print(all_select())
# id_genre = search_id(
# search_name="DVD", select_from="medium", select_where="medium")
scrape_and_add_movie(movie_name="Die unglaubliche Reise in einem verrückten Flugzeug",
medium_id=1)
for test in show_movie_list():
print(test.get("id"), test.get("medium"), test.get("release_year"))