diff --git a/.gitignore b/.gitignore index cb64eaa..701dcbb 100644 --- a/.gitignore +++ b/.gitignore @@ -13,3 +13,5 @@ local_settings.py seatable-python-runner/ seatable-python-runner.zip + +.python-version diff --git a/scheduler/app/database/initial_tables.sql b/scheduler/app/database/initial_tables.sql index 12bba36..5d81870 100644 --- a/scheduler/app/database/initial_tables.sql +++ b/scheduler/app/database/initial_tables.sql @@ -5,15 +5,19 @@ CREATE TABLE IF NOT EXISTS `script_log` ( `org_id` int(11) DEFAULT NULL, `script_name` varchar(255) NOT NULL, `context_data` longtext DEFAULT NULL, - `started_at` datetime(6) NOT NULL, + `started_at` datetime(6) DEFAULT NULL, `finished_at` datetime(6) DEFAULT NULL, `success` tinyint(1) DEFAULT NULL, `return_code` int(11) DEFAULT NULL, `output` longtext CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci DEFAULT NULL, `operate_from` varchar(50) DEFAULT NULL COMMENT 'manualy, automation-rule...etc', + `state` varchar(10) DEFAULT NULL, + `created_at` datetime(6) DEFAULT NULL, PRIMARY KEY (`id`), KEY `started_at_c6ns09vt` (`started_at`), - KEY `dtable_uuid_script_name_l0j7h5f2_union_key` (`dtable_uuid`,`script_name`) + KEY `dtable_uuid_script_name_l0j7h5f2_union_key` (`dtable_uuid`,`script_name`), + KEY `state_h3u8i9o1_key` (`state`), + KEY `created_at_h3u7y9o4_key` (`created_at`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8; CREATE TABLE IF NOT EXISTS `dtable_run_script_statistics` ( diff --git a/scheduler/app/faas_scheduler/constants.py b/scheduler/app/faas_scheduler/constants.py new file mode 100644 index 0000000..5f40b23 --- /dev/null +++ b/scheduler/app/faas_scheduler/constants.py @@ -0,0 +1 @@ +SCRIPTS_KEY = "faas_scheduler:scripts" diff --git a/scheduler/app/faas_scheduler/models.py b/scheduler/app/faas_scheduler/models.py index 5f76c20..5858c7b 100644 --- a/scheduler/app/faas_scheduler/models.py +++ b/scheduler/app/faas_scheduler/models.py @@ -31,6 +31,13 @@ class ScriptLog(Base): return_code = Column(Integer, nullable=True) output = Column(Text, nullable=True) operate_from = Column(String(255)) + state = Column(String(10)) + created_at = Column(DateTime, index=True) + + PENDING = "pending" + DISPACHED = "dispatched" + RUNNING = "running" + FINISHED = "finished" def __init__( self, @@ -39,7 +46,8 @@ def __init__( org_id, script_name, context_data, - started_at, + state, + created_at, operate_from=None, ): self.dtable_uuid = dtable_uuid @@ -47,7 +55,8 @@ def __init__( self.org_id = org_id self.script_name = script_name self.context_data = context_data - self.started_at = started_at + self.state = state + self.created_at = created_at self.operate_from = operate_from def to_dict(self, include_context_data=True, include_output=True): @@ -57,6 +66,7 @@ def to_dict(self, include_context_data=True, include_output=True): "id": self.id, "dtable_uuid": self.dtable_uuid, "owner": self.owner, + "org_id": self.org_id, "script_name": self.script_name, "started_at": datetime_to_isoformat_timestr(self.started_at), "finished_at": self.finished_at @@ -64,6 +74,9 @@ def to_dict(self, include_context_data=True, include_output=True): "success": self.success, "return_code": self.return_code, "operate_from": self.operate_from, + "state": self.state, + "created_at": self.created_at + and datetime_to_isoformat_timestr(self.created_at), } if include_context_data: diff --git a/scheduler/app/faas_scheduler/redis_client.py b/scheduler/app/faas_scheduler/redis_client.py new file mode 100644 index 0000000..27d07d9 --- /dev/null +++ b/scheduler/app/faas_scheduler/redis_client.py @@ -0,0 +1,78 @@ +import logging +import os +import time + +import redis + +logger = logging.getLogger(__name__) + + +class RedisClient: + def __init__(self): + self._client = self.get_redis_client() + + def get_redis_client(self): + pool = redis.ConnectionPool( + host=os.environ.get("REDIS_HOST") or "127.0.0.1", + port=int(os.environ.get("REDIS_PORT") or "6379"), + db=int(os.environ.get("REDIS_DB") or "0"), + password=os.environ.get("REDIS_PASSWORD", ""), + socket_timeout=3, + socket_connect_timeout=3, + retry_on_timeout=True, + health_check_interval=30, + decode_responses=True, + ) + + return redis.Redis(connection_pool=pool) + + # ========= executor within retry logic ========= + + def _execute(self, func, *args, **kwargs): + retry_count = 0 + while retry_count <= 3: + try: + return func(*args, **kwargs) + + except (redis.TimeoutError, redis.ConnectionError) as e: + logger.exception(e) + retry_count += 1 + time.sleep(0.1 * 2**retry_count) + continue + + except (redis.ResponseError, redis.DataError) as e: + logger.exception(e) + raise e + + # ========= KV ========= + + def get(self, key: str): + return self._execute(self._client.get, key) + + def set(self, key: str, value, ex: int | None = None): + return self._execute(self._client.set, key, value, ex=ex) + + # ========= LIST ========= + + def lpush(self, key: str, *values): + return self._execute(self._client.lpush, key, *values) + + def rpush(self, key: str, *values): + return self._execute(self._client.rpush, key, *values) + + def lpop(self, key: str): + return self._execute(self._client.lpop, key) + + def rpop(self, key: str): + return self._execute(self._client.rpop, key) + + def llen(self, key): + return self._execute(self._client.llen, key) + + # ========= utils ========= + + def exists(self, key: str) -> bool: + return bool(self._execute(self._client.exists, key)) + + def delete(self, *keys): + return self._execute(self._client.delete, *keys) diff --git a/scheduler/app/faas_scheduler/utils.py b/scheduler/app/faas_scheduler/utils.py index 331a3f1..4e3a4ab 100644 --- a/scheduler/app/faas_scheduler/utils.py +++ b/scheduler/app/faas_scheduler/utils.py @@ -2,24 +2,26 @@ import json import logging import requests -from datetime import datetime +from datetime import datetime, timedelta from typing import List, Optional, Tuple from uuid import UUID from tzlocal import get_localzone -from sqlalchemy import case, desc, func, text +from sqlalchemy import case, desc, func, text, and_ from sqlalchemy.orm import load_only -from faas_scheduler.models import ScriptLog +from faas_scheduler.models import ( + ScriptLog, + UserRunScriptStatistics, + OrgRunScriptStatistics, + DTableRunScriptStatistics, +) import sys sys.path.append("/opt/scheduler") -from database import DBSession logger = logging.getLogger(__name__) -STARTER_URL = os.getenv("PYTHON_STARTER_URL", "") -RUN_FUNC_URL = STARTER_URL.rstrip("/") + "/function/run-python" SEATABLE_SERVER_URL = os.getenv("SEATABLE_SERVER_URL", "") SCHEDULER_AUTH_TOKEN = os.getenv("PYTHON_SCHEDULER_AUTH_TOKEN", "") DELETE_LOG_DAYS = os.environ.get("DELETE_LOG_DAYS", "30") @@ -68,13 +70,8 @@ class ScriptInvalidException(Exception): pass -## part of ping to get the check if the python starter can be reached. -def ping_starter(): - response = requests.get(STARTER_URL.rstrip("/") + "/ping/", timeout=30) - if response.status_code == 200: - return True - - return False +class RunScriptError(Exception): + pass ## triggered from scheduler.py to remove old script_logs @@ -122,6 +119,34 @@ def delete_statistics_after_days(db_session): db_session.close() +def update_running_scripts_timeout(db_session): + deadline = datetime.now() - timedelta(seconds=SUB_PROCESS_TIMEOUT) + updated_count = ( + db_session.query(ScriptLog) + .filter( + and_( + ScriptLog.started_at.isnot(None), + ScriptLog.started_at <= deadline, + ScriptLog.state == ScriptLog.RUNNING, + ) + ) + .update( + { + ScriptLog.output: "timeout", + ScriptLog.return_code: -1, + ScriptLog.success: False, + ScriptLog.finished_at: datetime.now(), + ScriptLog.state: "finished", + }, + synchronize_session=False, + ) + ) + + db_session.commit() + + logger.info("updated %s script logs", updated_count) + + def check_auth_token(request): value = request.headers.get("Authorization", "") if ( @@ -163,101 +188,140 @@ def get_script_file(dtable_uuid, script_name): return response.json() -# call python-starter to run the script! -def call_faas_func(script_url, temp_api_token, context_data, script_id=None): +def update_stats_run_count(db_session, dtable_uuid, owner, org_id): + run_date = datetime.today().strftime("%Y-%m-%d") try: - data = { - "script_url": script_url, - "env": { - "dtable_web_url": SEATABLE_SERVER_URL.rstrip("/"), - "api_token": temp_api_token, - }, - "context_data": context_data, - "script_id": script_id, - } - headers = {"User-Agent": "python-scheduler/" + VERSION} - logger.debug("I call starter at url %s", RUN_FUNC_URL) - response = requests.post(RUN_FUNC_URL, json=data, timeout=30, headers=headers) - - # script will be executed asynchronously, so there will be nothing in response - # so only check response - - if response.status_code != 200: - logger.error( - "Fail to call scheduler: %s, data: %s, error response: %s, %s", - RUN_FUNC_URL, - data, - response.status_code, - response.text, + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), ) - + db_session.add(dtable_stats) + else: + dtable_stats.total_run_count += 1 + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_count += 1 + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() + ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=0, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_count += 1 + org_stats.update_at = datetime.now() + db_session.commit() except Exception as e: - logger.error( - "Fail to call scheduler: %s, data: %s, error: %s", RUN_FUNC_URL, data, e + logger.exception( + "update stats for org_id %s owner %s dtable %s run count error %s", + org_id, + owner, + dtable_uuid, + e, ) - return None - -def update_statistics(db_session, dtable_uuid, owner, org_id, spend_time): - if not spend_time: - return - username = owner - - # dtable_run_script_statistcis - sqls = [ - """ - INSERT INTO dtable_run_script_statistics(dtable_uuid, run_date, total_run_count, total_run_time, update_at) VALUES - (:dtable_uuid, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] - - # org_run_script_statistics - if org_id and org_id != -1: - sqls += [ - """ - INSERT INTO org_run_script_statistics(org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] - - # user_run_script_statistics - if "@seafile_group" not in username: - sqls += [ - """ - INSERT INTO user_run_script_statistics(username, org_id, run_date, total_run_count, total_run_time, update_at) VALUES - (:username, :org_id, :run_date, 1, :spend_time, :update_at) - ON DUPLICATE KEY UPDATE - org_id=:org_id, - total_run_count=total_run_count+1, - total_run_time=total_run_time+:spend_time, - update_at=:update_at; - """ - ] +def update_stats_run_time(db_session, dtable_uuid, owner, org_id, spend_time): + run_date = datetime.today().strftime("%Y-%m-%d") try: - for sql in sqls: - db_session.execute( - text(sql), - { - "dtable_uuid": dtable_uuid, - "username": username, - "org_id": org_id, - "run_date": datetime.today(), - "spend_time": spend_time, - "update_at": datetime.now(), - }, + dtable_stats = ( + db_session.query(DTableRunScriptStatistics) + .filter_by(dtable_uuid=dtable_uuid, run_date=run_date) + .first() + ) + if not dtable_stats: + dtable_stats = DTableRunScriptStatistics( + dtable_uuid=dtable_uuid, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), ) + db_session.add(dtable_stats) + else: + dtable_stats.total_run_time += spend_time + dtable_stats.update_at = datetime.now() + if org_id == -1: + if "@seafile_group" not in owner: + user_stats = ( + db_session.query(UserRunScriptStatistics) + .filter_by(username=owner, run_date=run_date) + .first() + ) + if not user_stats: + user_stats = UserRunScriptStatistics( + username=owner, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(user_stats) + else: + user_stats.total_run_time += spend_time + user_stats.update_at = datetime.now() + else: + org_stats = ( + db_session.query(OrgRunScriptStatistics) + .filter_by(org_id=org_id, run_date=run_date) + .first() + ) + if not org_stats: + org_stats = OrgRunScriptStatistics( + org_id=org_id, + run_date=run_date, + total_run_count=1, + total_run_time=spend_time, + update_at=datetime.now(), + ) + db_session.add(org_stats) + else: + org_stats.total_run_time += spend_time + org_stats.update_at = datetime.now() db_session.commit() except Exception as e: - logger.exception("update statistics sql error: %s", e) + logger.exception( + "update stats for org_id %s owner %s dtable %s run time error %s", + org_id, + owner, + dtable_uuid, + e, + ) # required to get "script logs" in dtable-web @@ -338,27 +402,6 @@ def can_run_task(owner, org_id, db_session, scripts_running_limit=None): return count < scripts_running_limit -# update entries in script_log after SUB_PROCESS_TIMEOUT (typically 15 minutes) -def check_and_set_tasks_timeout(db_session): - now = datetime.now() - sql = """ - UPDATE script_log SET success=0, return_code=-1, output=:timeout_output, finished_at=:now - WHERE success IS NULL AND TIMESTAMPDIFF(SECOND, started_at, :now) > :timeout_interval - """ - try: - db_session.execute( - text(sql), - { - "now": now, - "timeout_interval": SUB_PROCESS_TIMEOUT, - "timeout_output": TIMEOUT_OUTPUT, - }, - ) - db_session.commit() - except Exception as e: - logger.exception(e) - - def get_script(db_session, script_id): script = db_session.query(ScriptLog).filter_by(id=script_id).first() @@ -381,52 +424,48 @@ def add_script( org_id, script_name, context_data, + ScriptLog.PENDING, datetime.now(), operate_from, ) db_session.add(script) db_session.commit() + update_stats_run_count(db_session, dtable_uuid, owner, org_id) + return script -def update_script(db_session, script, success, return_code, output): - script.finished_at = datetime.now() +def update_script_running(db_session, started_at, script): + script.started_at = started_at + script.state = ScriptLog.RUNNING + db_session.commit() + + +def update_script( + db_session, script, success, return_code, output, started_at, finished_at +): + script.started_at = started_at + script.finished_at = finished_at script.success = success script.return_code = return_code script.output = output + script.state = ScriptLog.FINISHED db_session.commit() return script -# run_script is called from flash_server. Initializes the process. -def run_script( - script_id, dtable_uuid, script_name, script_url, temp_api_token, context_data +def on_script_done_update( + db_session, script_id, success, return_code, output, started_at, spend_time ): - """Only for flask-server""" - # from faas_scheduler import DBSession - db_session = DBSession() # for multithreading - - try: - if not script_url: - script_file = get_script_file(dtable_uuid, script_name) - script_url = script_file.get("script_url", "") - logger.debug("run_script executed...") - call_faas_func(script_url, temp_api_token, context_data, script_id=script_id) - except Exception as e: - logger.exception("Run script %d error: %s", script_id, e) - finally: - db_session.close() - - return True - - -def hook_update_script(db_session, script_id, success, return_code, output, spend_time): script = db_session.query(ScriptLog).filter_by(id=script_id).first() if script: - update_script(db_session, script, success, return_code, output) - update_statistics( + finished_at = started_at + timedelta(seconds=spend_time) + update_script( + db_session, script, success, return_code, output, started_at, finished_at + ) + update_stats_run_time( db_session, script.dtable_uuid, script.owner, script.org_id, spend_time ) diff --git a/scheduler/app/flask_server.py b/scheduler/app/flask_server.py index fd1047e..cde710e 100644 --- a/scheduler/app/flask_server.py +++ b/scheduler/app/flask_server.py @@ -8,7 +8,6 @@ from datetime import datetime, timedelta from flask import Flask, request, make_response from gevent.pywsgi import WSGIServer -from concurrent.futures import ThreadPoolExecutor from database import DBSession from faas_scheduler.utils import ( @@ -17,45 +16,36 @@ get_statistics_grouped_by_base, get_statistics_grouped_by_day, is_date_yyyy_mm_dd, - run_script, get_script, - add_script, get_run_script_statistics_by_month, - hook_update_script, can_run_task, get_run_scripts_count_monthly, - ping_starter, get_task_log, list_task_logs, uuid_str_to_32_chars, basic_log, + add_script, + on_script_done_update, + update_script_running, ) - +from scheduler import Scheduler basic_log("scheduler.log") -# defaults... -SCRIPT_WORKERS = int(os.environ.get("PYTHON_SCHEDULER_SCRIPT_WORKERS", 5)) -SUB_PROCESS_TIMEOUT = int(os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15)) -TIMEOUT_OUTPUT = ( - "The script's running time exceeded the limit and the execution was aborted." -) + +HOST = os.environ.get("PYTHON_SCHEDULER_BIND_HOST", "127.0.0.1") app = Flask(__name__) +scheduler = Scheduler( + int(os.environ.get("PYTHON_SCHEDULER_WINDOW_SECS", "300")), + float(os.environ.get("PYTHON_SCHEDULER_RATE_LIMIT_PERCENT", "250")), +) logger = logging.getLogger(__name__) -executor = ThreadPoolExecutor(max_workers=SCRIPT_WORKERS) @app.route("/ping/", methods=["GET"]) def ping(): - if not ping_starter(): - return make_response( - ( - "Error: Python Scheduler can not reach the Python Starter. Check PYTHON_STARTER_URL.", - 400, - ) - ) return make_response(("Pong", 200)) @@ -78,8 +68,6 @@ def scripts_api(): context_data = data.get("context_data") owner = data.get("owner") org_id = data.get("org_id") - script_url = data.get("script_url") - temp_api_token = data.get("temp_api_token") scripts_running_limit = data.get("scripts_running_limit", -1) operate_from = data.get("operate_from", "manualy") if not dtable_uuid or not script_name or not owner: @@ -93,27 +81,19 @@ def scripts_api(): owner, org_id, db_session, scripts_running_limit=scripts_running_limit ): return make_response(("The number of runs exceeds the limit"), 400) - script = add_script( + + script_log = add_script( db_session, - dtable_uuid, + uuid_str_to_32_chars(dtable_uuid), owner, org_id, script_name, context_data, operate_from, ) - logger.debug("lets call the starter to fire up the runner...") - executor.submit( - run_script, - script.id, - dtable_uuid, - script_name, - script_url, - temp_api_token, - context_data, - ) + scheduler.add_script(script_log.to_dict()) - return make_response(({"script_id": script.id}, 200)) + return make_response(({"script_id": script_log.id}, 200)) except Exception as e: logger.exception(e) return make_response(("Internal server error", 500)) @@ -146,19 +126,12 @@ def script_api(script_id): script = get_script(db_session, script_id) if not script: return make_response(("Not found", 404)) - if dtable_uuid != script.dtable_uuid or script_name != script.script_name: + if ( + uuid_str_to_32_chars(dtable_uuid) != script.dtable_uuid + or script_name != script.script_name + ): return make_response(("Bad request", 400)) - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - now = datetime.now() - duration_seconds = (now - script.started_at).seconds - if duration_seconds > SUB_PROCESS_TIMEOUT: - script.success = False - script.return_code = -1 - script.finished_at = now - script.output = TIMEOUT_OUTPUT - db_session.commit() - return make_response(({"script": script.to_dict()}, 200)) except Exception as e: @@ -190,7 +163,9 @@ def task_logs_api(dtable_uuid, script_name): db_session = DBSession() try: - task_logs = list_task_logs(db_session, dtable_uuid, script_name, order_by) + task_logs = list_task_logs( + db_session, uuid_str_to_32_chars(dtable_uuid), script_name, order_by + ) count = task_logs.count() task_logs = task_logs[start:end] task_log_list = [task_log.to_dict() for task_log in task_logs] @@ -277,6 +252,36 @@ def scripts_running_count(): return make_response(({"count": count}, 200)) +# endpoint to be informed that a script starts to run. (from starter) +@app.route("/script-running-callback/", methods=["POST"]) +def callback_script_running(): + """ + Update script_log is running, called from python-starter + """ + try: + data = request.get_json() + except Exception: + return make_response("Bad Request.", 400) + script_id = data.get("script_id") + started_at = data.get("started_at") + + db_session = DBSession() + try: + script_log = get_script(db_session, script_id) + if not script_log: + return {"error_msg": "Script not found"}, 404 + update_script_running( + db_session, datetime.fromtimestamp(started_at), script_log + ) + scheduler.on_script_start(script_log.to_dict(), started_at) + except Exception as e: + logger.exception("update script %s running error %s", script_id, e) + finally: + db_session.close() + + return {"success": True}, 200 + + # endpoint to be informed that the execution of python code is done. (from starter) @app.route("/script-result/", methods=["POST"]) def record_script_result(): @@ -290,17 +295,26 @@ def record_script_result(): success = data.get("success", False) return_code = data.get("return_code") output = data.get("output") - spend_time = data.get("spend_time") + started_at = data.get("started_at") + spend_time = data.get("spend_time") or 0 script_id = data.get("script_id") - db_session = DBSession() - # update script_log and run-time statistics + db_session = DBSession() try: - if script_id: - hook_update_script( - db_session, script_id, success, return_code, output, spend_time - ) + script_log = get_script(db_session, script_id) + if not script_log: + return {"error_msg": "Script not found"}, 404 + on_script_done_update( + db_session, + script_id, + success, + return_code, + output, + datetime.fromtimestamp(started_at), + spend_time, + ) + scheduler.on_script_done(script_log.to_dict(), started_at + spend_time) except Exception as e: logger.exception(e) @@ -308,7 +322,7 @@ def record_script_result(): finally: db_session.close() - return "success" + return {"success": True}, 200 # internal function... @@ -578,5 +592,6 @@ def get_run_statistics_grouped_by_day(): if __name__ == "__main__": - http_server = WSGIServer(("127.0.0.1", 5055), app) + scheduler.start() + http_server = WSGIServer((HOST, 5055), app) http_server.serve_forever() diff --git a/scheduler/app/requirements.txt b/scheduler/app/requirements.txt index 6b5589d..3bb5214 100644 --- a/scheduler/app/requirements.txt +++ b/scheduler/app/requirements.txt @@ -7,3 +7,4 @@ requests pyjwt pytz tzlocal +redis==7.1.0 diff --git a/scheduler/app/scheduler.py b/scheduler/app/scheduler.py index 7d6bfe1..fb6452d 100644 --- a/scheduler/app/scheduler.py +++ b/scheduler/app/scheduler.py @@ -1,56 +1,287 @@ +import json +import logging import os -import gc import time -import logging -from threading import Thread +from collections import deque, defaultdict +from datetime import datetime +from threading import Thread, Lock, Event from database import DBSession +from faas_scheduler.constants import SCRIPTS_KEY +from faas_scheduler.models import ScriptLog +from faas_scheduler.redis_client import RedisClient from faas_scheduler.utils import ( - check_and_set_tasks_timeout, + get_script_file, + on_script_done_update, delete_log_after_days, delete_statistics_after_days, - basic_log, + update_running_scripts_timeout, ) -basic_log("scheduler.log") +logger = logging.getLogger(__name__) -SUB_PROCESS_TIMEOUT = int( - os.environ.get("PYTHON_PROCESS_TIMEOUT", 60 * 15) -) # 15 minutes -logger = logging.getLogger(__name__) +SEATABLE_SERVER_URL = os.environ.get("SEATABLE_SERVER_URL") +REDIS_SCRIPTS_QUEUE_MAX_SIZE = int(os.environ.get("REDIS_SCRIPTS_QUEUE_MAX_SIZE", "1")) -class FAASTaskTimeoutSetter(Thread): +class Scheduler: - def __init__(self): - super(FAASTaskTimeoutSetter, self).__init__() - self.interval = 60 * 30 # every half an hour + def __init__(self, window_secs: int, rate_limit_percent: float): + self.window_secs = window_secs + self.rate_limit_percent = rate_limit_percent - def run(self): - if SUB_PROCESS_TIMEOUT and isinstance(SUB_PROCESS_TIMEOUT, int): - while True: - logger.info("Start automatic cleanup ...") - db_session = DBSession() - try: - check_and_set_tasks_timeout(db_session) - except Exception as e: - logger.exception("task cleaner error: %s", e) - finally: - db_session.close() + self.queue = deque() - # python garbage collection - logger.info("gc.collect: %s", str(gc.collect())) + self.key_history = defaultdict(deque) # key -> [(start, end)] + self.key_running = defaultdict(dict) # key -> {script_log_id: start_time} + self.key_dispatched = defaultdict(dict) # key -> {script_log_id: dispatch_time} - # remove old script_logs and statistics - delete_log_after_days(db_session) - delete_statistics_after_days(db_session) + self.lock = Lock() + + self.wakeup_event = Event() + self.clean_db_records_event = Event() + self.update_running_scripts_timeout_event = Event() + + self.redis_client = RedisClient() + + # -------- public apis -------- + def add_script(self, script_log_info: dict): + with self.lock: + self.queue.append(script_log_info) + self.wakeup_event.set() + + def on_script_start(self, script_log_info: dict, start_time: float): + key = self.get_script_key(script_log_info) + with self.lock: + self.key_dispatched[key].pop(script_log_info["id"], None) + self.key_running[key][script_log_info["id"]] = start_time + self.wakeup_event.set() + + def on_script_done(self, script_log_info: dict, end_time: float): + key = self.get_script_key(script_log_info) + with self.lock: + start_time = self.key_running[key].pop(script_log_info["id"], None) + self.key_history[key].append((start_time, end_time)) + self.wakeup_event.set() + + def start(self): + self.load_pending_script_logs() + Thread(target=self.schedule, daemon=True).start() + Thread(target=self.loop_clean_db_records, daemon=True).start() + Thread(target=self.loop_update_running_scripts_timeout, daemon=True).start() + + # -------- private methods -------- + def get_script_key(self, script_log_info: dict): + if script_log_info["org_id"] == -1: + return script_log_info["owner"] + else: + return script_log_info["org_id"] + + def clean_up(self, now): + for runs in self.key_history.values(): + while runs and runs[0][1] <= now - self.window_secs: + runs.popleft() + + def get_used_time_by_key(self, key, now): + used = 0.0 + + for s, e in self.key_history[key]: + used += e - s + + # dispatched but not started scripts, use (now - dispatch_time) as used time temporarily + for d in self.key_dispatched[key].values(): + used += now - max(d, now - self.window_secs) + + return used + + def get_earliest_release_time(self, key, now): + times = [] - # sleep - logger.info("Sleep for %d seconds ...", self.interval) - time.sleep(self.interval) + for _, e in self.key_history[key]: + times.append(e + self.window_secs) + for s in self.key_running[key].values(): + times.append(s + self.window_secs) + + for s in self.key_dispatched[key].values(): + times.append(s + self.window_secs) + + return min(times) if times else now + + def run_script(self, script_log_info: dict): + now = time.time() + key = self.get_script_key(script_log_info) + self.key_dispatched[key][script_log_info["id"]] = now + + db_session = DBSession() + try: + db_session.query(ScriptLog).filter( + ScriptLog.id == script_log_info["id"] + ).update({ScriptLog.state: ScriptLog.DISPACHED}) + db_session.commit() + script_file_info = get_script_file( + script_log_info["dtable_uuid"], script_log_info["script_name"] + ) + self.redis_client.lpush( + SCRIPTS_KEY, + json.dumps( + { + "script_id": script_log_info["id"], + "script_url": script_file_info["script_url"], + "dtable_uuid": script_log_info["dtable_uuid"], + "env": { + "dtable_web_url": SEATABLE_SERVER_URL.rstrip("/"), + "api_token": script_file_info["temp_api_token"], + }, + "context_data": script_log_info["context_data"], + } + ), + ) + logger.info( + "pushed script id %s org_id %s owner %s dtable_uuid %s script_name %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + ) + except Exception as e: + logger.exception( + "push script id %s org_id %s owner %s dtable_uuid %s script_name %s error %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + e, + ) + try: + on_script_done_update( + db_session, + script_log_info["id"], + False, + -1, + "Failed", + datetime.fromtimestamp(now), + 0, + ) + except Exception as ee: + logger.exception( + "update script id %s org_id %s owner %s dtable_uuid %s script_name %s finished error %s", + script_log_info["id"], + script_log_info["org_id"], + script_log_info["owner"], + script_log_info["dtable_uuid"], + script_log_info["script_name"], + ee, + ) + finally: + db_session.close() + self.on_script_done(script_log_info, time.time()) + + def load_pending_script_logs(self): + db_session = DBSession() + try: + script_logs = list( + db_session.query(ScriptLog) + .filter_by(state=ScriptLog.PENDING) + .order_by(ScriptLog.id) + ) + logger.info("load %s pending scripts", len(script_logs)) + for script_log in script_logs: + self.add_script(script_log.to_dict()) + except Exception as e: + logger.exception("load pending script logs error %s", e) + finally: + db_session.close() + + # -------- scheduler loop -------- + def schedule(self): + logger.info( + "Start scheduler loop, window_secs: %s rate_limit_percent: %s%%", + self.window_secs, + self.rate_limit_percent, + ) + while True: + if self.redis_client.llen(SCRIPTS_KEY) >= REDIS_SCRIPTS_QUEUE_MAX_SIZE: + time.sleep(0.1) + continue + + now = time.time() + started = False + sleep_until = None + with self.lock: + self.clean_up(now) + + skipped = deque() + + while self.queue: + script_log_info = self.queue.popleft() + key = self.get_script_key(script_log_info) + used = self.get_used_time_by_key(key, now) + + logger.debug( + "script %s key %s used %s used_percent %s%%", + script_log_info["id"], + key, + used, + used / self.window_secs * 100, + ) + + if used / self.window_secs * 100 < self.rate_limit_percent: + logger.info( + "script %s key %s used %s start to dispatch", + script_log_info["id"], + key, + used, + ) + self.run_script(script_log_info) + started = True + break + else: + skipped.append(script_log_info) + t = self.get_earliest_release_time(key, now) + sleep_until = t if sleep_until is None else min(sleep_until, t) + + self.queue = skipped + self.queue + + if started: + continue + + if sleep_until: + logger.info( + "No script can run, sleep until %s", + datetime.fromtimestamp(sleep_until), + ) + timeout = max(0.0, sleep_until - time.time()) + self.wakeup_event.wait(timeout=timeout) + else: + self.wakeup_event.wait(timeout=0.1) + + self.wakeup_event.clear() + + # -------- clean db records loop -------- + def loop_clean_db_records(self): + while True: + self.clean_db_records_event.wait(timeout=24 * 60 * 60) + db_session = DBSession() + try: + delete_log_after_days(db_session) + delete_statistics_after_days(db_session) + except Exception as e: + logger.exception("clean db records error %s", e) + finally: + db_session.close() -if __name__ == "__main__": - task_timeout_setter = FAASTaskTimeoutSetter() - task_timeout_setter.start() + # -------- update state loop -------- + def loop_update_running_scripts_timeout(self): + while True: + self.update_running_scripts_timeout_event.wait(timeout=15 * 60) + db_session = DBSession() + try: + update_running_scripts_timeout(db_session) + except Exception as e: + logger.exception("update running scripts timeout error %s", e) + finally: + db_session.close() diff --git a/scheduler/app/upgrade/4.5.0.sql b/scheduler/app/upgrade/4.5.0.sql new file mode 100644 index 0000000..496e840 --- /dev/null +++ b/scheduler/app/upgrade/4.5.0.sql @@ -0,0 +1,9 @@ + +ALTER TABLE script_log MODIFY started_at DATETIME(6); + +ALTER TABLE script_log ADD COLUMN state VARCHAR(10); +ALTER TABLE script_log ADD KEY state_h3u8i9o1_key (state); + +ALTER TABLE script_log ADD COLUMN created_at DATETIME(6); +ALTER TABLE script_log ADD KEY created_at_h3u7y9o4_key (created_at); +UPDATE script_log SET created_at=started_at; diff --git a/starter/entrypoint.sh b/starter/entrypoint.sh index 657b286..ee13d30 100755 --- a/starter/entrypoint.sh +++ b/starter/entrypoint.sh @@ -62,12 +62,8 @@ check_starter_config() { #### check_starter_config -if [ "${LOG_TO_STDOUT:-false}" = "false" ]; then - export UWSGI_LOGTO="/opt/seatable-python-starter/logs/uwsgi.log" -fi - -echo "** uWSGI is starting now" -uwsgi --ini /opt/seatable-python-starter/uwsgi.ini 2>&1 & +echo "** starter starting now" +python -u runner.py sleep 1 echo "** SeaTable Python Starter ready" diff --git a/starter/logrotate/logrotate.conf b/starter/logrotate/logrotate.conf index 280fa04..fb87002 100644 --- a/starter/logrotate/logrotate.conf +++ b/starter/logrotate/logrotate.conf @@ -1,5 +1,4 @@ /opt/seatable-python-starter/logs/starter.log -/opt/seatable-python-starter/logs/uwsgi.log { daily rotate 7 diff --git a/starter/redis_client.py b/starter/redis_client.py new file mode 100644 index 0000000..3c700bb --- /dev/null +++ b/starter/redis_client.py @@ -0,0 +1,73 @@ +import logging +import os +import time + +import redis + +logger = logging.getLogger(__name__) + + +class RedisClient: + def __init__(self): + self._client = self.get_redis_client() + + def get_redis_client(self): + pool = redis.ConnectionPool( + host=os.environ.get("REDIS_HOST") or "127.0.0.1", + port=int(os.environ.get("REDIS_PORT") or "6379"), + db=int(os.environ.get("REDIS_DB") or "0"), + password=os.environ.get("REDIS_PASSWORD", ""), + socket_timeout=3, + socket_connect_timeout=3, + retry_on_timeout=True, + health_check_interval=30, + decode_responses=True, + ) + + return redis.Redis(connection_pool=pool) + + # ========= executor within retry logic ========= + + def _execute(self, func, *args, **kwargs): + retry_count = 0 + while retry_count <= 3: + try: + return func(*args, **kwargs) + + except (redis.TimeoutError, redis.ConnectionError) as e: + logger.exception(e) + retry_count += 1 + time.sleep(0.2 * 2**retry_count) + continue + + except (redis.ResponseError, redis.DataError) as e: + logger.exception(e) + raise e + + # ========= KV ========= + + def get(self, key: str): + return self._execute(self._client.get, key) + + def set(self, key: str, value, ex: int | None = None): + return self._execute(self._client.set, key, value, ex=ex) + + # ========= LIST ========= + + def lpush(self, key: str, *values): + return self._execute(self._client.lpush, key, *values) + + def rpush(self, key: str, *values): + return self._execute(self._client.rpush, key, *values) + + def lpop(self, key: str): + return self._execute(self._client.lpop, key) + + def rpop(self, key: str): + return self._execute(self._client.rpop, key) + + def exists(self, key: str) -> bool: + return bool(self._execute(self._client.exists, key)) + + def delete(self, *keys): + return self._execute(self._client.delete, *keys) diff --git a/starter/requirements.txt b/starter/requirements.txt index 707c6ba..d4978bd 100644 --- a/starter/requirements.txt +++ b/starter/requirements.txt @@ -1,3 +1,2 @@ -uwsgi -flask requests +redis==7.1.0 diff --git a/starter/runner.py b/starter/runner.py index e4cb429..e68ff19 100644 --- a/starter/runner.py +++ b/starter/runner.py @@ -7,11 +7,13 @@ import time import ast import sys +from queue import Queue from concurrent.futures import ThreadPoolExecutor from uuid import uuid4 import requests -from flask import Flask, request, make_response + +from redis_client import RedisClient working_dir = os.getcwd() @@ -70,6 +72,12 @@ SEATABLE_USER_UID = 1000 SEATABLE_USER_GID = 1000 +# bind host +HOST = os.environ.get("PYTHON_STARTER_BIND_HOST", "127.0.0.1") + +# redis +SCRIPTS_KEY = "faas_scheduler:scripts" + def get_log_level(level): if level.lower() == "info": @@ -103,7 +111,6 @@ def basic_log(log_file): basic_log("starter.log") -app = Flask(__name__) executor = ThreadPoolExecutor(THREAD_COUNT) DEFAULT_SUB_PROCESS_TIMEOUT = SUB_PROCESS_TIMEOUT @@ -132,12 +139,19 @@ def to_python_bool(value): return value.lower() == "true" -def send_to_scheduler(success, return_code, output, spend_time, request_data): +class CallbackScriptRunningError(Exception): + pass + + +def send_to_scheduler( + success, return_code, output, started_at, spend_time, request_data +): """ This function is used to send result of script to scheduler - success: whether script running successfully - return_code: return-code of subprocess - output: output of subprocess or error message + - started_at: start timestamp - spend_time: time subprocess took - request_data: data from request """ @@ -152,7 +166,8 @@ def send_to_scheduler(success, return_code, output, spend_time, request_data): "success": success, "return_code": return_code, "output": output, - "spend_time": spend_time, + "started_at": started_at, + "spend_time": spend_time or 0, } result_data.update( { @@ -182,13 +197,32 @@ def send_to_scheduler(success, return_code, output, spend_time, request_data): ) +def callback_script_running(script_id, started_at): + url = PYTHON_SCHEDULER_URL.strip("/") + "/script-running-callback/" + headers = {"User-Agent": "python-starter/" + VERSION} + resp = requests.post( + url, + headers=headers, + json={"script_id": script_id, "started_at": started_at}, + timeout=30, + ) + if not resp.ok: + raise CallbackScriptRunningError( + f"script {script_id} callback script running error status {resp.status_code} content {resp.content}" + ) + + def run_python(data): logging.info("New python run initalized... (v%s)", VERSION) + started_at = time.time() + + callback_script_running(data.get("script_id"), started_at) + script_url = data.get("script_url") if not script_url: - send_to_scheduler(False, None, "Script URL is missing", None, data) + send_to_scheduler(False, None, "Script URL is missing", started_at, None, data) return if ( to_python_bool(USE_ALTERNATIVE_FILE_SERVER_ROOT) @@ -228,11 +262,11 @@ def run_python(data): logging.error( "Failed to get script from %s, response: %s", script_url, resp ) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return except Exception as e: logging.error("Failed to get script from %s, error: %s", script_url, e) - send_to_scheduler(False, None, "Fail to get script", None, data) + send_to_scheduler(False, None, "Fail to get script", started_at, None, data) return logging.debug("Generate temporary random folder directory") @@ -264,13 +298,15 @@ def run_python(data): return_code, output = None, "" # init output except Exception as e: logging.error("Failed to save script %s, error: %s", script_url, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return try: logging.debug("Fix ownership of %s", tmp_dir) - os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) + # os.chown(tmp_dir, SEATABLE_USER_UID, SEATABLE_USER_GID) except Exception as e: logging.error("Failed to chown %s, error: %s", tmp_dir, e) + send_to_scheduler(False, -1, "", started_at, 0, data) return logging.debug("prepare the command to start the python runner") @@ -339,8 +375,6 @@ def run_python(data): command.append("run") # override command logging.debug("command: %s", command) - start_at = time.time() - logging.debug("try to start the python runner image") try: result = subprocess.run( @@ -371,6 +405,7 @@ def run_python(data): False, -1, "The script's running time exceeded the limit and the execution was aborted.", + started_at, DEFAULT_SUB_PROCESS_TIMEOUT, data, ) @@ -378,7 +413,7 @@ def run_python(data): except Exception as e: logging.exception(e) logging.error("Failed to run file %s error: %s", script_url, e) - send_to_scheduler(False, None, None, None, data) + send_to_scheduler(False, None, None, started_at, None, data) return else: logging.debug( @@ -388,7 +423,12 @@ def run_python(data): if os.path.isfile(output_file_path): if os.path.islink(output_file_path): send_to_scheduler( - False, -1, "Script invalid!", time.time() - start_at, data + False, + -1, + "Script invalid!", + started_at, + time.time() - started_at, + data, ) return with open(output_file_path, "r") as f: @@ -418,7 +458,7 @@ def run_python(data): except Exception as e: logging.warning("Fail to remove container error: %s", e) - spend_time = time.time() - start_at + spend_time = time.time() - started_at logging.info("python run finished successful. duration was: %s", spend_time) logging.debug( "send this to the scheduler. return_code: %s, output: %s, spend_time: %s, data: %s", @@ -427,36 +467,47 @@ def run_python(data): spend_time, data, ) - send_to_scheduler(return_code == 0, return_code, output, spend_time, data) - - -#################### - + send_to_scheduler( + return_code == 0, return_code, output, started_at, spend_time, data + ) -@app.route("/ping/", methods=["POST", "GET"]) -def ping(): - return make_response(("Pong", 200)) +class Runner: -@app.route("/", defaults={"path": ""}, methods=["POST", "GET"]) -@app.route("/function/run-python", methods=["POST", "GET"]) -def main_route(): - try: - data = request.get_json() - except Exception: - return make_response("Bad Request.", 400) - try: - executor.submit(run_python, data) - except Exception as e: - logging.error(e) - return make_response("Internal Server Error.", 500) - return make_response("Received", 200) + def __init__(self): + self.queue = Queue(maxsize=1) + def worker(self): + while True: + data = self.queue.get() + try: + run_python(data) + except Exception as e: + logging.exception(e) -@app.route("/_/health", methods=["POST", "GET"]) -def health_check(): - return "Everything is ok." + def start(self): + workers_executor = ThreadPoolExecutor( + max_workers=THREAD_COUNT, thread_name_prefix="worker-" + ) + for _ in range(THREAD_COUNT): + workers_executor.submit(self.worker) + logging.info("Started %s workers", THREAD_COUNT) + redis_client = RedisClient() + while True: + try: + data = redis_client.rpop(SCRIPTS_KEY) + if data: + data = json.loads(data) + logging.info( + "get script %s dtable_uuid %s", + data["script_id"], + data["dtable_uuid"], + ) + self.queue.put(data) + except Exception as e: + logging.exception(e) + redis_client = RedisClient() if __name__ == "__main__": - app.run(port=8088, debug=False) + Runner().start() diff --git a/starter/uwsgi.ini b/starter/uwsgi.ini deleted file mode 100644 index a30b59d..0000000 --- a/starter/uwsgi.ini +++ /dev/null @@ -1,12 +0,0 @@ -[uwsgi] -http = :8080 -wsgi-file = /opt/seatable-python-starter/runner.py -callable = app -process = 4 -threads = 2 -vacuum = true -buffer-size = 65536 -max-fd = 1024 -stats = 127.0.0.1:9191 -procname-prefix = run-python -log-level = info