forked from server/soundboard
180 lines
4.8 KiB
Python
180 lines
4.8 KiB
Python
import os
|
|
import random
|
|
import sys
|
|
import subprocess
|
|
import sqlite3
|
|
|
|
from flask import Flask, render_template, request, redirect, url_for, send_from_directory, g, jsonify
|
|
|
|
import config
|
|
|
|
app = Flask(__name__)
|
|
app.jinja_env.trim_blocks = True
|
|
app.jinja_env.lstrip_blocks = True
|
|
|
|
# Credits: http://flask.pocoo.org/docs/0.12/patterns/sqlite3/
|
|
def getDB():
|
|
db = getattr(g, "_database", None)
|
|
|
|
if db is None:
|
|
if os.path.isabs(config.db):
|
|
dbPath = config.db
|
|
else:
|
|
dbPath = os.path.join(sys.path[0], config.db)
|
|
|
|
db = g._database = sqlite3.connect(dbPath)
|
|
db.row_factory = sqlite3.Row
|
|
|
|
return db
|
|
|
|
@app.teardown_appcontext
|
|
def closeDBConnection(exception):
|
|
db = getattr(g, "_database", None)
|
|
|
|
if db is not None:
|
|
db.close()
|
|
|
|
def queryDB(query, args=(), one=False):
|
|
cur = getDB().execute(query, args)
|
|
result = cur.fetchall()
|
|
|
|
cur.close()
|
|
|
|
return (result[0] if result else None) if one else result
|
|
|
|
@app.route("/")
|
|
@app.route("/edit")
|
|
@app.route("/play/<sound>")
|
|
@app.route("/say/", methods=["POST"])
|
|
@app.route("/say/<text>")
|
|
def index(sound=None, text=None, video=None):
|
|
sounds = [os.fsencode(file).decode() for file in os.listdir(config.path)]
|
|
sounds = sorted(sounds)
|
|
|
|
tags = queryDB("SELECT name FROM tag ORDER BY name COLLATE NOCASE")
|
|
|
|
if sound is not None and sound in sounds:
|
|
subprocess.Popen(["omxplayer", os.path.join(config.path, sound).encode("utf-8")], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
|
|
if text is None:
|
|
text = request.form.get("text")
|
|
|
|
if text is not None:
|
|
voice = request.form.get("voice", default="")
|
|
voice = voice if voice.strip() != "" else "DE"
|
|
speed = request.form.get("speed", default="")
|
|
speed = speed if speed.strip() != "" else "160"
|
|
pitch = request.form.get("pitch", default="")
|
|
pitch = pitch if pitch.strip() != "" else "50"
|
|
|
|
subprocess.Popen(["espeak", "-v", voice, "-s", speed, "-p", pitch, text.encode("utf-8")], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
return redirect("/")
|
|
|
|
video = request.args.get("video")
|
|
|
|
if video is not None:
|
|
if video[-4:] == ".mp3":
|
|
subprocess.Popen(["omxplayer", video], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
else:
|
|
url = subprocess.check_output(["youtube-dl", "-g", "-f", "mp4", video]).decode()
|
|
subprocess.Popen(["omxplayer", url.split("\n")[1]])
|
|
subprocess.Popen(["omxplayer", "-b", url.split("\n")[0]])
|
|
|
|
killvideo = request.args.get("killvideo")
|
|
|
|
if killvideo is not None and killvideo in ["1", "true", "yes"]:
|
|
subprocess.Popen(["pkill", "-f", "omxplayer"])
|
|
|
|
if request.path == "/edit":
|
|
edit = True
|
|
else:
|
|
edit = False
|
|
|
|
return render_template("index.html", sounds=sounds, tags=tags, edit=edit)
|
|
|
|
@app.route("/edit/<sound>", methods=["GET", "POST"])
|
|
def edit(sound):
|
|
tags = queryDB("""\
|
|
SELECT
|
|
tag.id,
|
|
tag.name,
|
|
checked.id IS NOT NULL AS checked
|
|
FROM
|
|
tag
|
|
LEFT OUTER JOIN (
|
|
SELECT
|
|
tag.id
|
|
FROM
|
|
tag
|
|
JOIN
|
|
button_tags
|
|
ON
|
|
fk_tag = tag.id
|
|
WHERE fk_button = (
|
|
SELECT
|
|
button.id
|
|
FROM
|
|
button
|
|
WHERE
|
|
button.file = ?
|
|
)
|
|
) AS checked
|
|
ON tag.id = checked.id""", (sound,))
|
|
|
|
if request.method == "POST":
|
|
if not request.form.get("cancel"):
|
|
buttonId = queryDB("SELECT id FROM button WHERE file = ?", (sound,), True)
|
|
|
|
if buttonId is None:
|
|
queryDB("INSERT INTO button (file) VALUES (?)", (sound,))
|
|
getDB().commit()
|
|
|
|
buttonId = queryDB("SELECT id FROM button WHERE file = ?", (sound,), True)
|
|
|
|
for tag in tags:
|
|
checkbox = 1 if request.form.get("cb-{}".format(tag["name"])) else 0
|
|
|
|
if tag["checked"] < checkbox:
|
|
queryDB("INSERT OR REPLACE INTO button_tags (fk_button, fk_tag) VALUES (?, ?)", (buttonId[0], tag["id"]))
|
|
getDB().commit()
|
|
elif tag["checked"] > checkbox:
|
|
queryDB("DELETE FROM button_tags WHERE fk_button = ? AND fk_tag = ?", (buttonId[0], tag["id"]))
|
|
getDB().commit()
|
|
|
|
return redirect("/edit")
|
|
|
|
return render_template("edit.html", sound=sound, tags=tags)
|
|
|
|
@app.route("/sounds/<path:name>")
|
|
def sounds(name):
|
|
return send_from_directory(config.path, name)
|
|
|
|
@app.route("/random")
|
|
def play_random():
|
|
sounds = [os.fsencode(file).decode() for file in os.listdir(config.path)]
|
|
random_sound = random.sample(sounds, 1)[0]
|
|
subprocess.Popen(["omxplayer", os.path.join(config.path, random_sound).encode("utf-8")], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
|
|
return render_template("index.html")
|
|
|
|
@app.route("/tags/<tags>")
|
|
def tags(tags):
|
|
# Credits: https://stackoverflow.com/a/38955049/1532986
|
|
query = """\
|
|
SELECT button.file
|
|
FROM button
|
|
WHERE button.id IN (
|
|
SELECT button_tags.fk_button
|
|
FROM button_tags
|
|
JOIN tag
|
|
ON tag.id = button_tags.fk_tag
|
|
GROUP BY button_tags.fk_button
|
|
HAVING
|
|
"""
|
|
tagFilter = "\t\tSUM(CASE WHEN tag.name = ? THEN 1 ELSE 0 END) > 0"
|
|
tagFilters = " AND \n".join([tagFilter.format(tag) for tag in tags.split(",")])
|
|
query += tagFilters + "\n)"
|
|
|
|
sounds = queryDB(query, tags.split(","))
|
|
|
|
return jsonify([sound["file"].split(".")[0] for sound in sounds])
|