/Library/New_Server/opt/local/pl.procesy5
// /opt/local/pl.procesy5/async_jobs - TODO: APP_PATH_ASYNC_JOB from .htaccess or config
// Config example:
// cat SE/config/.cnf-biuro.biall-net.pl.ini.php:
// APP_PATH_ASYNC_JOB="/opt/local/pl.procesy5/async_jobs"
// register new job:
// - $TODAY = date("Y-m-d");
// - $JOB_ID = generateJobID if not already started?
// - make folder APP_PATH_ASYNC_JOB / $TODAY / job-{$JOB_ID}
// - make base files:
// - log files: `out.log`, `error.log`
// - `output`: if process creates file. @param --out_file=out
// - `output_type`: output mime type or namespace
// - `progress`: if process implement progress. @param --progress_file=progress
class Core_AsyncJobs {
static $VERSION = 1; // `CRM_CONFIG`.`CONF_KEY` = 'Core_AsyncJobs__version'
static $CRM_CONFIG_VERSION_KEY = 'Core_AsyncJobs__version';
static function getSimpleList() {
$fullList = self::getFullList();
return array_map(function ($jobInfo) {
return [
'name' => $jobInfo['name'],
'pid' => $jobInfo['pid'],
'pm_id' => $jobInfo['pm_id'],
'pm2_env.status' => $jobInfo['pm2_env']['status'],
'monit.memory' => $jobInfo['monit']['memory'],
'monit.cpu' => $jobInfo['monit']['cpu'],
// "pm_out_log_path": "/Library/WebServer/.pm2/logs/test-job-1-out.log",
// "pm_err_log_path": "/Library/WebServer/.pm2/logs/test-job-1-error.log",
// "pm_pid_path": "/Library/WebServer/.pm2/pids/test-job-0.pid",
'pm2_env.pm_out_log_path' => $jobInfo['pm2_env']['pm_out_log_path'],
'pm2_env.pm_err_log_path' => $jobInfo['pm2_env']['pm_err_log_path'],
'pm2_env.pm_pid_path' => $jobInfo['pm2_env']['pm_pid_path'],
];
}, $fullList);
}
static function getFullList() {
$jobListJson = V::shell_exec("pm2 jlist 2>&1");
if (empty($jobListJson)) throw new Exception("Reading async job list failed");
$parsedJobList = @json_decode($jobListJson, $assoc = true);
if (null == $parsedJobList && 0 !== json_last_error()) throw new Exception("Parsing async job list failed: " . json_last_error());
return $parsedJobList;
}
static function getJobLogs($jobNameOrID, $lastLines = 10) {
if (empty($jobNameOrID) && $jobNameOrID !== 0) throw new Exception("Missing job name or id in getJobLogs");
$lastLines = ((int)$lastLines > 0) ? (int)$lastLines : 10;
// $cmd = "pm2 logs {$jobNameOrID} --lines {$lastLines} --nostream | tail -n {$lastLines}";
$cmd = "pm2 logs {$jobNameOrID} --lines {$lastLines} --nostream";
return V::shell_exec($cmd);
// [TAILING] Tailing last 3 lines for [0] process (change the value with --lines option)
// /Library/WebServer/.pm2/logs/test-job-1-out.log last 3 lines:
// /Library/WebServer/
}
static function stopJob($jobNameOrID) {
if (empty($jobNameOrID) && $jobNameOrID !== 0) throw new Exception("Missing job name or id in stopJob");
$cmd = "pm2 stop {$jobNameOrID}";
return V::shell_exec($cmd);
}
static function startJob($jobNameOrID) {
if (empty($jobNameOrID) && $jobNameOrID !== 0) throw new Exception("Missing job name or id in startJob");
$cmd = "pm2 start {$jobNameOrID}";
return V::shell_exec($cmd);
}
static function startNewJob($jobName) {
// for Ant:
// index.php?_route=UrlAction_Ant&_task=ant&path=default_db.in7_dziennik_koresp/etykieta&typeName=default_db:IN7_DZIENNIK_KORESP&primaryKey=66263&primaryKeyField=ID
// index.php?_route=UrlAction_Ant
// & _task=ant
// & path=default_db.in7_dziennik_koresp/test-bash
// & typeName=default_db:IN7_DZIENNIK_KORESP
// & primaryKey=66263
// & primaryKeyField=ID
// index.php?_route=UrlAction_Ant
// & _task=ant
// & path=default_db.in7_dziennik_koresp/test-bash
// & template=test-loop
// & typeName=default_db:IN7_DZIENNIK_KORESP
// & primaryKey=66263
// & primaryKeyField=ID
// ant=default_db:IN7_DZIENNIK_KORESP/test-bash & task=test-loop & ns=default_db:IN7_DZIENNIK_KORESP & pk=66263
if (empty($jobName)) throw new Exception("Missing job name");
// $jobName - check if already started? pm2 will return failed
$outLogPath = "p5-async-jobs/jobX/logs/out.log";
$errorLogPath = "p5-async-jobs/jobX/logs/error.log";
$jobExecPath = ""; // TODO: path to exec
$args = ""; // args for script
$cmd = implode(" ", [
"pm2 start '{$jobExecPath}'",
"--name '{$jobName}'",
"--no-autorestart",
"--output '{$outLogPath}'",
"--error '{$errorLogPath}'",
"--time", // prefix time to log entry
"-- {$args}",
]);
return V::shell_exec($cmd);
}
static function deleteStopped() {
// TODO: script to remove stopped in loop with delay
// pm2 start app.js --restart-delay=3000
// $ pm2 list | grep '^│' | awk -F'│' '{ gsub(/ /, "", $2); gsub(/ /, "", $10); if ("stopped" == $10) { print $2" # STOPPED" } else { print $10 } }' | grep '# STOPPED' | xargs -n1 pm2 delete
$testCmd = implode(" | ", [
"pm2 list",
"grep '^│'",
"awk -F'│' '{ gsub(/ /, \"\", \$2); gsub(/ /, \"\", \$10); if (\"stopped\" == \$10) { print \$2\" # STOPPED\" } else { print \$10 } }'",
"grep '# STOPPED'",
"awk '{print \$1}'",
]);
V::exec($testCmd . " 2>&1", $out, $ret);
echo "cmd: {$cmd}
RETURN CODE: '{$ret}'
OUTPUT:\n" . implode("\n", $out) . "";
$cmd = implode(" | ", [
"pm2 list",
"grep '^│'",
"awk -F'│' '{ gsub(/ /, \"\", \$2); gsub(/ /, \"\", \$10); if (\"stopped\" == \$10) { print \$2\" # STOPPED\" } else { print \$10 } }'",
"grep '# STOPPED'",
"awk '{print \$1}'",
"xargs -n1 pm2 delete",
]);
}
static function getNodePath() { return "/usr/local/bin/node"; }
// npm_path="/usr/local/bin/npm"
static function getPm2Path() { return "/usr/local/bin/pm2"; }
static function getPm2WwwUserPath() { return "/Library/WebServer/.pm2/"; }
static function isInstalled() {
$confAsyncPath = Config::get('APP_PATH_ASYNC_JOB');
if (!$confAsyncPath) throw new Exception("Missing Config APP_PATH_ASYNC_JOB");
if (!file_exists($confAsyncPath)) {
mkdir($confAsyncPath, $mode = 0777, $recursive = TRUE);
}
if (!file_exists($confAsyncPath)) throw new Exception("Folder not exists APP_PATH_ASYNC_JOB");
// V::exec("/usr/local/bin/pm2 --version 2>&1", $out, $ret);
V::exec("/usr/local/bin/pm2 ping 2>&1", $out, $ret); // expected "{ msg: 'pong' }"
// echo UI::h('pre', [], "ret({$ret}):\n" . implode("\n", $out));
if ($ret === 0) {
// [PM2] Spawning PM2 daemon with pm2_home=/Library/WebServer/.pm2
// [PM2] PM2 Successfully daemonized
// { msg: 'pong' }
}
if ($ret !== 0) {
if (!file_exists(self::getNodePath())) throw new Exception("pm2 not installed");
// if [ ! -f "$node_path" ]; then
// echo "$node_path not exists"
// wget https://nodejs.org/dist/v12.15.0/node-v12.15.0.pkg
// sudo installer -verbose -pkg node-v12.15.0.pkg -target /
// fi
// # node -v # expected v12.15.0
if (!file_exists(self::getPm2Path())) throw new Exception("pm2 not installed");
// npm install -g pm2
// sudo npm install -g pm2
// pm2 -version # expected 4.2.3
if (!file_exists(self::getPm2WwwUserPath())) throw new Exception("pm2 user folder not exists");
// FIX for Mac OS:
// $ sudo mkdir /Library/WebServer/.pm2/
// $ sudo chown _www /Library/WebServer/.pm2/
throw new Exception("Error pm2"); // unknown error
}
if (!self::checkAsyncJobDatabase()) throw new Exception("Error database schema for AsyncJobs");
return true;
}
static function checkAsyncJobDatabase() {
// `CRM_CONFIG`.`CONF_KEY` = 'Core_AsyncJobs__version'
$dbVersion = (int)DB::getPDO()->fetchValue(" select CONV_VAL from CRM_CONFIG where CONF_KEY = :key ", [ ':key' => self::$CRM_CONFIG_VERSION_KEY ]);
if ($dbVersion < 1) self::upgradeAsyncJobDatabaseToVersion1();
// $dbVersion = (int)DB::getPDO()->fetchValue(" select CONV_VAL from CRM_CONFIG where CONF_KEY = :key ", [ ':key' => 'Core_AsyncJobs__version' ]);
// if ($dbVersion < 2) self::upgradeAsyncJobDatabaseToVersion2();
$dbVersion = (int)DB::getPDO()->fetchValue(" select CONV_VAL from CRM_CONFIG where CONF_KEY = :key ", [ ':key' => self::$CRM_CONFIG_VERSION_KEY ]);
return ($dbVersion < self::$VERSION) ? false : true;
}
static function upgradeAsyncJobDatabaseToVersion1() {
// - insertOrUpdate new row in `CRM_ASYNC_FUNCTIONS` ( $TODAY, $version, $user, $jobName )
// - $JOB_ID = fetchValue select ID from `CRM_ASYNC_FUNCTIONS` where JOB_NAME = $jobName
// - `CRM_ASYNC_FUNCTIONS`.`A_STATUS` default 'WAITING' - not started
// - `CRM_ASYNC_FUNCTIONS`.`A_STATUS`: 'NORMAL' - started
// - `CRM_ASYNC_FUNCTIONS`.`A_STATUS`: 'OFF_HARD' - not running
// - `CRM_ASYNC_FUNCTIONS`.`A_STATUS`: 'DELETED' - removed
$sql = "
CREATE TABLE IF NOT EXISTS `CRM_ASYNC_FUNCTIONS` ( -- list of async function definitions / config
`ID` int(11) NOT NULL AUTO_INCREMENT,
`ID_ZASOB` int(11) NOT NULL DEFAULT 0, -- TODO - register function in CRM_LISTA_ZASOBOW URL_ACTION to set perms
`JOB_NAME` varchar(200) NOT NULL,
`VERSION` int(11) NOT NULL DEFAULT 0,
`LOCK_TYPE` enum('ROW', 'TABLE', 'SYSTEM', 'NO_LOCK') DEFAULT 'ROW',
-- ROW - only one active job per row, eg. create pdf, close FV, @require primaryKey in JOB.LOCK_VALUE
-- TABLE - ony one active job per table, eg. update columns, make report, sync
-- SYSTEM - only one active job
-- TODO USER_... - lock per user
-- NO_LOCK - allow multiple jobs
-- `USER` varchar(20) NOT NULL,
-- `DATE` date NOT NULL,
-- `A_STATUS` enum('WAITING','NORMAL','WARNING','OFF_SOFT','OFF_HARD','DELETED') DEFAULT 'WAITING',
`A_RECORD_CREATE_DATE` datetime NOT NULL,
`A_RECORD_CREATE_AUTHOR` varchar(20) NOT NULL,
`A_RECORD_UPDATE_DATE` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
`A_RECORD_UPDATE_AUTHOR` varchar(20) NOT NULL,
PRIMARY KEY (`ID`),
UNIQUE KEY `JOB_NAME` (`JOB_NAME`),
KEY `DATE` (`DATE`),
KEY `A_STATUS` (`A_STATUS`),
KEY `A_RECORD_UPDATE_DATE` (`A_RECORD_UPDATE_DATE`)
) ENGINE=MyISAM DEFAULT CHARSET=latin2;
";
$sql = "
CREATE TABLE IF NOT EXISTS `CRM_ASYNC_JOB_LOG` (
`ID` int(11) NOT NULL AUTO_INCREMENT,
`ID_FUNCTION` int(11) NOT NULL,
`ID_SOURCE_JOB` int(11) NOT NULL DEFAULT 0, -- id job
`P_ID` int(11) NOT NULL DEFAULT 0, -- parent job id
`DATE` date NOT NULL,
`VERSION` int(11) NOT NULL DEFAULT 0,
`JOB_NAME` varchar(200) NOT NULL, -- copy from CRM_ASYNC_FUNCTIONS
`LOCK_VALUE` varchar(200) NOT NULL DEFAULT '', -- second part for unique JOB_NAME eg. primaryKey
`USER` varchar(20) NOT NULL, -- user who start this function
`A_STATUS` enum('WAITING','NORMAL','WARNING','OFF_SOFT','OFF_HARD','DELETED') DEFAULT 'WAITING',
`A_RECORD_CREATE_DATE` datetime NOT NULL,
`A_RECORD_CREATE_AUTHOR` varchar(20) NOT NULL,
`A_RECORD_UPDATE_DATE` datetime NOT NULL DEFAULT '0000-00-00 00:00:00',
`A_RECORD_UPDATE_AUTHOR` varchar(20) NOT NULL,
PRIMARY KEY (`ID`),
UNIQUE KEY `JOB_LOCK` (`JOB_NAME`, `LOCK_VALUE`, `USER`), -- add USER to store user click and respond that job is running by other user
KEY `DATE` (`DATE`),
KEY `A_STATUS` (`A_STATUS`),
KEY `A_RECORD_UPDATE_DATE` (`A_RECORD_UPDATE_DATE`)
) ENGINE=MyISAM DEFAULT CHARSET=latin2;
";
// @usage:
// `CRM_ASYNC_FUNCTIONS`.`JOB_NAME` = '
// route = ant
// -- & path = default_db.in7_dziennik_koresp/test-bash
// & template = test-loop
// & typeName = default_db:IN7_DZIENNIK_KORESP
// & primaryKey = 66263
// & primaryKeyField = ID
// -> `CRM_ASYNC_FUNCTIONS`.`JOB_NAME` =
// namespace = default_db/IN7_DZIENNIK_KORESP
// ant = test-bash
// template = test-loop
// primaryKey = 66263
// -> `CRM_ASYNC_FUNCTIONS`.`JOB_NAME` Format: "{$type}|..."
// -> `CRM_ASYNC_FUNCTIONS`.`JOB_NAME` Format: "Ant|{$namespace}"
// -> `CRM_ASYNC_JOB_LOG`.`LOCK_VALUE` Format: "{$primaryKey}|{$func_name}|..."
// -> `CRM_ASYNC_JOB_LOG`.`LOCK_VALUE` Format: "{$primaryKey}|{$func_name}|{$template}"
// -> Example: JOB_NAME = 'Ant|default_db/IN7_DZIENNIK_KORESP', LOCK_VALUE = '66263|test-bash|test-loop'
// Add fields:
// - function config at execution time:
// - eg LOCK by user | feature | namespace | no_lock -- should contain in name
// - process result - last result - NO - result from file, separate request needed
// IDEA: 2 tables:
// - `Config` - for function config (create row if not exists)
// - `Log` - for log where JOB_ID is created or used from Config table
// then createNewJob:
// 1. fetch job config from CRM_ASYNC_FUNCTIONS_CONFIG
// 1.1 IF 404 then create row, read config from function (how?)
// 1.2 IF exists then check status and lock config
// 1.2.1 IF online and lock then return error
// 1.2.2 IF !online and !lock then create new row in Log table and return correct JOB_ID
self::_upgdateAsyncJobDatabaseVersion($version = 1);
}
static function upgradeAsyncJobDatabaseToVersion2() {
// ...
self::_upgdateAsyncJobDatabaseVersion($version = 2);
}
static function _upgdateAsyncJobDatabaseVersion($version) {
DB::getPDO()->insertOrUpdate('CRM_CONFIG', [
'CONF_KEY' => self::$CRM_CONFIG_VERSION_KEY,
'@insert' => [
'CONF_VAL' => $version,
],
'@update' => [
'CONF_VAL' => $version,
]
]);
}
// pm2 logs -h
//
// Usage: logs [options] [id|name]
// stream logs file. Default stream all logs
// Options:
// --json json log output
// --format formated log output
// --raw raw output
// --err only shows error output
// --out only shows standard output
// --lines