import os import sys import subprocess import sqlite3 from flask import Flask, render_template, request, redirect, url_for, send_from_directory, g import config app = Flask(__name__) app.jinja_env.trim_blocks = True app.jinja_env.lstrip_blocks = True # Credits: http://flask.pocoo.org/docs/0.12/patterns/sqlite3/ def getDB(): db = getattr(g, "_database", None) if db is None: if os.path.isabs(config.db): dbPath = config.db else: dbPath = os.path.join(sys.path[0], config.db) db = g._database = sqlite3.connect(dbPath) db.row_factory = sqlite3.Row return db @app.teardown_appcontext def closeDBConnection(exception): db = getattr(g, "_database", None) if db is not None: db.close() def queryDB(query, args=(), one=False): cur = getDB().execute(query, args) result = cur.fetchall() cur.close() return (result[0] if result else None) if one else result @app.route("/") @app.route("/edit") @app.route("/play/") @app.route("/say/", methods=["POST"]) @app.route("/say/") def index(sound=None, text=None, video=None): sounds = [os.fsencode(file).decode() for file in os.listdir(config.path)] sounds = sorted(sounds) tags = queryDB("SELECT name FROM tag ORDER BY name COLLATE NOCASE") if sound is not None and sound in sounds: subprocess.Popen(["omxplayer", os.path.join(config.path, sound).encode("utf-8")], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) if text is None: text = request.form.get("text") if text is not None: voice = request.form.get("voice", default="") voice = voice if voice.strip() != "" else "DE" speed = request.form.get("speed", default="") speed = speed if speed.strip() != "" else "160" pitch = request.form.get("pitch", default="") pitch = pitch if pitch.strip() != "" else "50" subprocess.Popen(["espeak", "-v", voice, "-s", speed, "-p", pitch, text.encode("utf-8")], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) return redirect("/") video = request.args.get("video") if video is not None: if video[-4:] == ".mp3": subprocess.Popen(["omxplayer", video], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) else: url = subprocess.check_output(["youtube-dl", "-g", "-f", "mp4", video]).decode() subprocess.Popen(["omxplayer", url.split("\n")[1]]) subprocess.Popen(["omxplayer", "-b", url.split("\n")[0]]) killvideo = request.args.get("killvideo") if killvideo is not None and killvideo in ["1", "true", "yes"]: subprocess.Popen(["pkill", "-f", "omxplayer"]) if request.path == "/edit": edit = True else: edit = False return render_template("index.html", sounds=sounds, tags=tags, edit=edit) @app.route("/edit/", methods=["GET", "POST"]) def edit(sound): tags = queryDB("""\ SELECT tag.id, tag.name, checked.id IS NOT NULL AS checked FROM tag LEFT OUTER JOIN ( SELECT tag.id FROM tag JOIN button_tags ON fk_tag = tag.id WHERE fk_button = ( SELECT button.id FROM button WHERE button.file = ? ) ) AS checked ON tag.id = checked.id""", (sound,)) if request.method == "POST": if not request.form.get("cancel"): buttonId = queryDB("SELECT id FROM button WHERE file = ?", (sound,), True) if buttonId is None: queryDB("INSERT INTO button (file) VALUES (?)", (sound,)) getDB().commit() buttonId = queryDB("SELECT id FROM button WHERE file = ?", (sound,), True) for tag in tags: checkbox = 1 if request.form.get("cb-{}".format(tag["name"])) else 0 if tag["checked"] < checkbox: queryDB("INSERT OR REPLACE INTO button_tags (fk_button, fk_tag) VALUES (?, ?)", (buttonId[0], tag["id"])) getDB().commit() elif tag["checked"] > checkbox: queryDB("DELETE FROM button_tags WHERE fk_button = ? AND fk_tag = ?", (buttonId[0], tag["id"])) getDB().commit() return redirect("/edit") return render_template("edit.html", sound=sound, tags=tags) @app.route("/sounds/") def sounds(name): return send_from_directory(config.path, name)