import os import random import sys import subprocess import sqlite3 from flask import Flask, render_template, request, redirect, url_for, send_from_directory, g, jsonify from omxplayer import * import config app = Flask(__name__) app.jinja_env.trim_blocks = True app.jinja_env.lstrip_blocks = True # Credits: http://flask.pocoo.org/docs/0.12/patterns/sqlite3/ def getDB(): db = getattr(g, "_database", None) if db is None: if os.path.isabs(config.db): dbPath = config.db else: dbPath = os.path.join(sys.path[0], config.db) db = g._database = sqlite3.connect(dbPath) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def closeDBConnection(exception): db = getattr(g, "_database", None) if db is not None: db.close() def queryDB(query, args=(), one=False): cur = getDB().execute(query, args) result = cur.fetchall() cur.close() return (result[0] if result else None) if one else result @app.route("/") @app.route("/edit") @app.route("/play/") @app.route("/say/", methods=["POST"]) @app.route("/say/") def index(sound=None, text=None, video=None): sounds = [os.fsencode(file).decode() for file in os.listdir(config.path)] sounds = sorted(sounds) tags = queryDB("SELECT name FROM tag ORDER BY name COLLATE NOCASE") if sound is not None and sound in sounds: player.OMXPlayer(os.path.join(config.path, sound).encode("utf-8")) if text is None: text = request.form.get("text") if text is not None: voice = request.form.get("voice", default="") voice = voice if voice.strip() != "" else "DE" speed = request.form.get("speed", default="") speed = speed if speed.strip() != "" else "160" pitch = request.form.get("pitch", default="") pitch = pitch if pitch.strip() != "" else "50" subprocess.Popen(["espeak", "-v", voice, "-s", speed, "-p", pitch, text.encode("utf-8")], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return redirect("/") video = request.args.get("video") if video is not None: if video[-4:] == ".mp3": player.OMXPlayer(video) else: urls = subprocess.check_output(["youtube-dl", "-g", "-f", "mp4", video]).decode().split("\n") for url in urls: player.OMXPlayer(url, args=["-b"]) killvideo = request.args.get("killvideo") if killvideo is not None and killvideo in ["1", "true", "yes"]: subprocess.Popen(["pkill", "-f", "omxplayer"]) if request.path == "/edit": edit = True else: edit = False return render_template("index.html", sounds=sounds, tags=tags, edit=edit) @app.route("/edit/", methods=["GET", "POST"]) def edit(sound): tags = queryDB("""\ SELECT tag.id, tag.name, checked.id IS NOT NULL AS checked FROM tag LEFT OUTER JOIN ( SELECT tag.id FROM tag JOIN button_tags ON fk_tag = tag.id WHERE fk_button = ( SELECT button.id FROM button WHERE button.file = ? ) ) AS checked ON tag.id = checked.id""", (sound,)) if request.method == "POST": if not request.form.get("cancel"): buttonId = queryDB("SELECT id FROM button WHERE file = ?", (sound,), True) if buttonId is None: queryDB("INSERT INTO button (file) VALUES (?)", (sound,)) getDB().commit() buttonId = queryDB("SELECT id FROM button WHERE file = ?", (sound,), True) for tag in tags: checkbox = 1 if request.form.get("cb-{}".format(tag["name"])) else 0 if tag["checked"] < checkbox: queryDB("INSERT OR REPLACE INTO button_tags (fk_button, fk_tag) VALUES (?, ?)", (buttonId[0], tag["id"])) getDB().commit() elif tag["checked"] > checkbox: queryDB("DELETE FROM button_tags WHERE fk_button = ? AND fk_tag = ?", (buttonId[0], tag["id"])) getDB().commit() return redirect("/edit") return render_template("edit.html", sound=sound, tags=tags) @app.route("/sounds/") def sounds(name): return send_from_directory(config.path, name) @app.route("/random") def play_random(): sounds = [os.fsencode(file).decode() for file in os.listdir(config.path)] random_sound = random.sample(sounds, 1)[0] player.OMXPlayer(os.path.join(config.path, random_sound).encode("utf-8")) return render_template("index.html") @app.route("/tags/") def tags(tags): # Credits: https://stackoverflow.com/a/38955049/1532986 query = """\ SELECT button.file FROM button WHERE button.id IN ( SELECT button_tags.fk_button FROM button_tags JOIN tag ON tag.id = button_tags.fk_tag GROUP BY button_tags.fk_button HAVING """ tagFilter = "\t\tSUM(CASE WHEN tag.name = ? THEN 1 ELSE 0 END) > 0" tagFilters = " AND \n".join([tagFilter.format(tag) for tag in tags.split(",")]) query += tagFilters + "\n)" sounds = queryDB(query, tags.split(",")) return jsonify([sound["file"].split(".")[0] for sound in sounds]) @app.route("/tags/add/", methods=["GET", "POST"]) def tags_add(): if request.method == "POST": if not request.form.get("cancel") and request.form.get("tag") is not None: queryDB("INSERT OR REPLACE INTO tag (name) VALUES (?)", (request.form.get("tag"),)) getDB().commit() return redirect("/edit") return render_template("tags_add.html") @app.route("/tags/delete/") def tags_delete(name): queryDB("DELETE FROM tag WHERE name = ?", (name,)) getDB().commit() return redirect("/edit")