golden hour
/opt/saltstack/salt/lib/python3.10/site-packages/salt/utils
⬆️ Go Up
Upload
File/Folder
Size
Actions
__init__.py
237 B
Del
OK
__pycache__
-
Del
OK
aggregation.py
5.17 KB
Del
OK
ansible.py
1.48 KB
Del
OK
args.py
18.33 KB
Del
OK
asynchronous.py
4.06 KB
Del
OK
atomicfile.py
5.33 KB
Del
OK
aws.py
20.37 KB
Del
OK
azurearm.py
11.42 KB
Del
OK
beacons.py
517 B
Del
OK
boto3mod.py
8.36 KB
Del
OK
boto_elb_tag.py
3.02 KB
Del
OK
botomod.py
7.98 KB
Del
OK
cache.py
11.49 KB
Del
OK
channel.py
489 B
Del
OK
cloud.py
116.3 KB
Del
OK
color.py
2.72 KB
Del
OK
compat.py
1.89 KB
Del
OK
configcomparer.py
3.88 KB
Del
OK
configparser.py
10.82 KB
Del
OK
context.py
6.8 KB
Del
OK
crypt.py
5 KB
Del
OK
ctx.py
1.42 KB
Del
OK
data.py
53.03 KB
Del
OK
dateutils.py
2.3 KB
Del
OK
debug.py
4.19 KB
Del
OK
decorators
-
Del
OK
dictdiffer.py
16.48 KB
Del
OK
dicttrim.py
3.9 KB
Del
OK
dictupdate.py
11.33 KB
Del
OK
dns.py
35.21 KB
Del
OK
doc.py
2.25 KB
Del
OK
dockermod
-
Del
OK
entrypoints.py
1.83 KB
Del
OK
environment.py
2.2 KB
Del
OK
error.py
1.18 KB
Del
OK
etcd_util.py
33.25 KB
Del
OK
event.py
52.45 KB
Del
OK
extend.py
8.87 KB
Del
OK
extmods.py
6.04 KB
Del
OK
filebuffer.py
3.15 KB
Del
OK
files.py
27.94 KB
Del
OK
find.py
22.08 KB
Del
OK
fsutils.py
3.29 KB
Del
OK
functools.py
6.02 KB
Del
OK
gitfs.py
130.41 KB
Del
OK
github.py
1.52 KB
Del
OK
gzip_util.py
2.86 KB
Del
OK
hashutils.py
5.91 KB
Del
OK
http.py
33.9 KB
Del
OK
iam.py
1.22 KB
Del
OK
icinga2.py
754 B
Del
OK
idem.py
1.22 KB
Del
OK
immutabletypes.py
2.46 KB
Del
OK
itertools.py
2.36 KB
Del
OK
jid.py
3 KB
Del
OK
jinja.py
33.92 KB
Del
OK
job.py
6.89 KB
Del
OK
json.py
3.78 KB
Del
OK
kickstart.py
41.04 KB
Del
OK
kinds.py
493 B
Del
OK
lazy.py
3.06 KB
Del
OK
listdiffer.py
10.9 KB
Del
OK
locales.py
2.06 KB
Del
OK
mac_utils.py
14.01 KB
Del
OK
mako.py
3.97 KB
Del
OK
master.py
29.7 KB
Del
OK
mattermost.py
1.77 KB
Del
OK
memcached.py
3.56 KB
Del
OK
migrations.py
1.46 KB
Del
OK
mine.py
3.68 KB
Del
OK
minion.py
4.13 KB
Del
OK
minions.py
43.38 KB
Del
OK
mount.py
1.15 KB
Del
OK
msazure.py
5.36 KB
Del
OK
msgpack.py
4.69 KB
Del
OK
nacl.py
13.65 KB
Del
OK
namecheap.py
4.32 KB
Del
OK
napalm.py
23.22 KB
Del
OK
nb_popen.py
7.24 KB
Del
OK
network.py
73.86 KB
Del
OK
nxos.py
12.94 KB
Del
OK
nxos_api.py
4 KB
Del
OK
odict.py
13.21 KB
Del
OK
openstack
-
Del
OK
oset.py
6.41 KB
Del
OK
pagerduty.py
3.03 KB
Del
OK
parsers.py
122.69 KB
Del
OK
path.py
11.24 KB
Del
OK
pbm.py
9.81 KB
Del
OK
pkg
-
Del
OK
platform.py
5.84 KB
Del
OK
powershell.py
4.15 KB
Del
OK
preseed.py
2.64 KB
Del
OK
process.py
40.76 KB
Del
OK
profile.py
3.21 KB
Del
OK
proxy.py
331 B
Del
OK
psutil_compat.py
3.63 KB
Del
OK
pushover.py
4.51 KB
Del
OK
pycrypto.py
5.41 KB
Del
OK
pydsl.py
13.74 KB
Del
OK
pyobjects.py
10.75 KB
Del
OK
reactor.py
18.99 KB
Del
OK
reclass.py
752 B
Del
OK
roster_matcher.py
3.55 KB
Del
OK
rsax931.py
8.42 KB
Del
OK
s3.py
8.78 KB
Del
OK
saltclass.py
14.27 KB
Del
OK
sanitizers.py
2.51 KB
Del
OK
schedule.py
71.81 KB
Del
OK
schema.py
54.26 KB
Del
OK
sdb.py
4.04 KB
Del
OK
slack.py
3.58 KB
Del
OK
smb.py
11.16 KB
Del
OK
smtp.py
3.27 KB
Del
OK
ssdp.py
14.75 KB
Del
OK
ssh.py
769 B
Del
OK
state.py
8.43 KB
Del
OK
stringio.py
355 B
Del
OK
stringutils.py
16.95 KB
Del
OK
systemd.py
5.51 KB
Del
OK
templates.py
24.03 KB
Del
OK
textformat.py
5.03 KB
Del
OK
thin.py
31.91 KB
Del
OK
timed_subprocess.py
4.06 KB
Del
OK
timeout.py
1.53 KB
Del
OK
timeutil.py
2.4 KB
Del
OK
url.py
5 KB
Del
OK
user.py
11.86 KB
Del
OK
validate
-
Del
OK
value.py
247 B
Del
OK
vault.py
21.74 KB
Del
OK
verify.py
25.34 KB
Del
OK
versions.py
17.17 KB
Del
OK
virt.py
3.24 KB
Del
OK
virtualbox.py
22.43 KB
Del
OK
vmware.py
129.74 KB
Del
OK
vsan.py
17.18 KB
Del
OK
vt.py
31.47 KB
Del
OK
vt_helper.py
4.4 KB
Del
OK
win_chcp.py
3.7 KB
Del
OK
win_dacl.py
95.49 KB
Del
OK
win_dotnet.py
4.74 KB
Del
OK
win_functions.py
12.69 KB
Del
OK
win_lgpo_auditpol.py
8.48 KB
Del
OK
win_lgpo_netsh.py
17.87 KB
Del
OK
win_lgpo_reg.py
16.98 KB
Del
OK
win_network.py
16.35 KB
Del
OK
win_osinfo.py
2.83 KB
Del
OK
win_pdh.py
13.85 KB
Del
OK
win_reg.py
30.82 KB
Del
OK
win_runas.py
10.53 KB
Del
OK
win_service.py
5.2 KB
Del
OK
win_system.py
14.47 KB
Del
OK
win_update.py
40.36 KB
Del
OK
winapi.py
818 B
Del
OK
x509.py
73.19 KB
Del
OK
xdg.py
316 B
Del
OK
xmlutil.py
13.91 KB
Del
OK
yaml.py
349 B
Del
OK
yamldumper.py
3.37 KB
Del
OK
yamlencoding.py
1.55 KB
Del
OK
yamllint.py
1.61 KB
Del
OK
yamlloader.py
6.04 KB
Del
OK
yamlloader_old.py
8.15 KB
Del
OK
yast.py
619 B
Del
OK
zeromq.py
1.74 KB
Del
OK
zfs.py
19.15 KB
Del
OK
Edit: schedule.py
# See doc/topics/jobs/index.rst """ Scheduling routines are located here. To activate the scheduler make the ``schedule`` option available to the master or minion configurations (master config file or for the minion via config or pillar). Detailed tutorial about scheduling jobs can be found :ref:`here <scheduling-jobs>`. Requires that python-dateutil is installed on the minion. """ import copy import datetime import errno import itertools import logging import os import random import signal import sys import threading import time import weakref import salt.config import salt.defaults.exitcodes import salt.exceptions import salt.loader import salt.minion import salt.payload import salt.syspaths import salt.utils.args import salt.utils.error import salt.utils.event import salt.utils.files import salt.utils.jid import salt.utils.master import salt.utils.minion import salt.utils.platform import salt.utils.process import salt.utils.stringutils import salt.utils.user import salt.utils.yaml from salt.exceptions import SaltInvocationError from salt.utils.odict import OrderedDict # pylint: disable=import-error try: import dateutil.parser as dateutil_parser _WHEN_SUPPORTED = True _RANGE_SUPPORTED = True except ImportError: _WHEN_SUPPORTED = False _RANGE_SUPPORTED = False try: import croniter _CRON_SUPPORTED = True except ImportError: _CRON_SUPPORTED = False # pylint: enable=import-error log = logging.getLogger(__name__) class Schedule: """ Create a Schedule object, pass in the opts and the functions dict to use """ instance = None def __new__( cls, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None, standalone=False, new_instance=False, utils=None, _subprocess_list=None, ): """ Only create one instance of Schedule """ if cls.instance is None or new_instance is True: log.debug("Initializing new Schedule") # we need to make a local variable for this, as we are going to store # it in a WeakValueDictionary-- which will remove the item if no one # references it-- this forces a reference while we return to the caller instance = object.__new__(cls) instance.__singleton_init__( opts, functions, returners=returners, intervals=intervals, cleanup=cleanup, proxy=proxy, standalone=standalone, utils=utils, _subprocess_list=_subprocess_list, ) if new_instance is True: return instance cls.instance = instance else: log.debug("Re-using Schedule") return cls.instance # has to remain empty for singletons, since __init__ will *always* be called def __init__( self, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None, standalone=False, new_instance=False, utils=None, _subprocess_list=None, ): pass # an init for the singleton instance to call def __singleton_init__( self, opts, functions, returners=None, intervals=None, cleanup=None, proxy=None, standalone=False, utils=None, _subprocess_list=None, ): self.opts = opts self.proxy = proxy self.functions = functions self.utils = utils or salt.loader.utils(opts) self.standalone = standalone self.skip_function = None self.skip_during_range = None self.splay = None self.enabled = True if isinstance(intervals, dict): self.intervals = intervals else: self.intervals = {} if not self.standalone: if hasattr(returners, "__getitem__"): self.returners = returners else: self.returners = returners.loader.gen_functions() try: self.time_offset = self.functions.get( "timezone.get_offset", lambda: "0000" )() except Exception: # pylint: disable=W0703 # get_offset can fail, if that happens, default to 0000 log.warning( "Unable to obtain correct timezone offset, defaulting to 0000", exc_info_on_loglevel=logging.DEBUG, ) self.time_offset = "0000" self.schedule_returner = self.option("schedule_returner") # Keep track of the lowest loop interval needed in this variable self.loop_interval = sys.maxsize if not self.standalone: clean_proc_dir(opts) if cleanup: for prefix in cleanup: self.delete_job_prefix(prefix) if _subprocess_list is None: self._subprocess_list = salt.utils.process.SubprocessList() else: self._subprocess_list = _subprocess_list def __getnewargs__(self): return self.opts, self.functions, self.returners, self.intervals, None def option(self, opt): """ Return options merged from config and pillar """ if "config.merge" in self.functions: return self.functions["config.merge"](opt, {}, omit_master=True) return self.opts.get(opt, {}) def _get_schedule( self, include_opts=True, include_pillar=True, remove_hidden=False ): """ Return the schedule data structure """ schedule = {} if include_pillar: pillar_schedule = self.opts.get("pillar", {}).get("schedule", {}) if not isinstance(pillar_schedule, dict): raise ValueError("Schedule must be of type dict.") schedule.update(pillar_schedule) if include_opts: opts_schedule = self.opts.get("schedule", {}) if not isinstance(opts_schedule, dict): raise ValueError("Schedule must be of type dict.") schedule.update(opts_schedule) if remove_hidden: _schedule = copy.deepcopy(schedule) for job in schedule: if isinstance(schedule[job], dict): for item in schedule[job]: if item.startswith("_"): del _schedule[job][item] return _schedule return schedule def _check_max_running(self, func, data, opts, now): """ Return the schedule data structure """ # Check to see if there are other jobs with this # signature running. If there are more than maxrunning # jobs present then don't start another. # If jid_include is False for this job we can ignore all this # NOTE--jid_include defaults to True, thus if it is missing from the data # dict we treat it like it was there and is True # Check if we're able to run if "run" not in data or not data["run"]: return data if "jid_include" not in data or data["jid_include"]: jobcount = 0 if self.opts["__role"] == "master": current_jobs = salt.utils.master.get_running_jobs(self.opts) else: current_jobs = salt.utils.minion.running(self.opts) for job in current_jobs: if "schedule" in job: log.debug( "schedule.handle_func: Checking job against fun %s: %s", func, job, ) if data["name"] == job[ "schedule" ] and salt.utils.process.os_is_running(job["pid"]): jobcount += 1 log.debug( "schedule.handle_func: Incrementing jobcount, " "now %s, maxrunning is %s", jobcount, data["maxrunning"], ) if jobcount >= data["maxrunning"]: log.debug( "schedule.handle_func: The scheduled job " "%s was not started, %s already running", data["name"], data["maxrunning"], ) data["_skip_reason"] = "maxrunning" data["_skipped"] = True data["_skipped_time"] = now data["run"] = False return data return data def persist(self): """ Persist the modified schedule into <<configdir>>/<<default_include>>/_schedule.conf """ config_dir = self.opts.get("conf_dir", None) if config_dir is None and "conf_file" in self.opts: config_dir = os.path.dirname(self.opts["conf_file"]) if config_dir is None: config_dir = salt.syspaths.CONFIG_DIR minion_d_dir = os.path.join( config_dir, os.path.dirname( self.opts.get( "default_include", salt.config.DEFAULT_MINION_OPTS["default_include"], ) ), ) if not os.path.isdir(minion_d_dir): os.makedirs(minion_d_dir) schedule_conf = os.path.join(minion_d_dir, "_schedule.conf") log.debug("Persisting schedule") schedule_data = self._get_schedule(include_pillar=False, remove_hidden=True) try: with salt.utils.files.fopen(schedule_conf, "wb+") as fp_: fp_.write( salt.utils.stringutils.to_bytes( salt.utils.yaml.safe_dump({"schedule": schedule_data}) ) ) except OSError: log.error( "Failed to persist the updated schedule", exc_info_on_loglevel=logging.DEBUG, ) def delete_job(self, name, persist=True, fire_event=True): """ Deletes a job from the scheduler. Ignore jobs from pillar """ # ensure job exists, then delete it if name in self.opts["schedule"]: del self.opts["schedule"][name] elif name in self._get_schedule(include_opts=False): log.warning("Cannot delete job %s, it's in the pillar!", name) if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_delete_complete", ) # remove from self.intervals if name in self.intervals: del self.intervals[name] if persist: self.persist() def reset(self): """ Reset the scheduler to defaults """ self.skip_function = None self.skip_during_range = None self.enabled = True self.splay = None self.opts["schedule"] = {} def delete_job_prefix(self, name, persist=True, fire_event=True): """ Deletes a job from the scheduler. Ignores jobs from pillar """ # ensure job exists, then delete it for job in list(self.opts["schedule"].keys()): if job.startswith(name): del self.opts["schedule"][job] for job in self._get_schedule(include_opts=False): if job.startswith(name): log.warning("Cannot delete job %s, it's in the pillar!", job) if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_delete_complete", ) # remove from self.intervals for job in list(self.intervals.keys()): if job.startswith(name): del self.intervals[job] if persist: self.persist() def add_job(self, data, persist=True, fire_event=True): """ Adds a new job to the scheduler. The format is the same as required in the configuration file. See the docs on how YAML is interpreted into python data-structures to make sure, you pass correct dictionaries. """ # we don't do any checking here besides making sure its a dict. # eval() already does for us and raises errors accordingly if not isinstance(data, dict): raise ValueError("Scheduled jobs have to be of type dict.") if not len(data) == 1: raise ValueError("You can only schedule one new job at a time.") # if enabled is not included in the job, # assume job is enabled. for job in data: if "enabled" not in data[job]: data[job]["enabled"] = True new_job = next(iter(data.keys())) if new_job in self._get_schedule(include_opts=False): log.warning("Cannot update job %s, it's in the pillar!", new_job) elif new_job in self.opts["schedule"]: log.info("Updating job settings for scheduled job: %s", new_job) self.opts["schedule"].update(data) else: log.info("Added new job %s to scheduler", new_job) self.opts["schedule"].update(data) # Fire the complete event back along with updated list of schedule if fire_event: with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_add_complete", ) if persist: self.persist() def enable_job(self, name, persist=True, fire_event=True): """ Enable a job in the scheduler. Ignores jobs from pillar """ # ensure job exists, then enable it if name in self.opts["schedule"]: self.opts["schedule"][name]["enabled"] = True log.info("Enabling job %s in scheduler", name) elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_enabled_job_complete", ) if persist: self.persist() def disable_job(self, name, persist=True, fire_event=True): """ Disable a job in the scheduler. Ignores jobs from pillar """ # ensure job exists, then disable it if name in self.opts["schedule"]: self.opts["schedule"][name]["enabled"] = False log.info("Disabling job %s in scheduler", name) elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) if fire_event: with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: # Fire the complete event back along with updated list of schedule evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_disabled_job_complete", ) if persist: self.persist() def modify_job(self, name, schedule, persist=True, fire_event=True): """ Modify a job in the scheduler. Ignores jobs from pillar """ # ensure job exists, then replace it if name in self.opts["schedule"]: self.delete_job(name, persist, fire_event) elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) return self.opts["schedule"][name] = schedule if persist: self.persist() def run_job(self, name): """ Run a schedule job now """ data = self._get_schedule().get(name, {}) if "function" in data: func = data["function"] elif "func" in data: func = data["func"] elif "fun" in data: func = data["fun"] else: func = None if func not in self.functions: log.info("Invalid function: %s in scheduled job %s.", func, name) if "name" not in data: data["name"] = name # Assume run should be True until we check max_running if "run" not in data: data["run"] = True if not self.standalone: data = self._check_max_running( func, data, self.opts, datetime.datetime.now() ) # Grab run, assume True if data.get("run"): log.info("Running Job: %s", name) self._run_job(func, data) def enable_schedule(self, persist=True, fire_event=True): """ Enable the scheduler. """ self.opts["schedule"]["enabled"] = True if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_enabled_complete", ) if persist: self.persist() def disable_schedule(self, persist=True, fire_event=True): """ Disable the scheduler. """ self.opts["schedule"]["enabled"] = False if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_disabled_complete", ) if persist: self.persist() def reload(self, schedule): """ Reload the schedule from saved schedule file. """ # Remove all jobs from self.intervals self.intervals = {} if "schedule" in schedule: schedule = schedule["schedule"] self.opts.setdefault("schedule", {}).update(schedule) def list(self, where, fire_event=True): """ List the current schedule items """ if where == "pillar": schedule = self._get_schedule(include_opts=False) elif where == "opts": schedule = self._get_schedule(include_pillar=False) else: schedule = self._get_schedule() if fire_event: # Fire the complete event back along with the list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": schedule}, tag="/salt/minion/minion_schedule_list_complete", ) def save_schedule(self, fire_event=True): """ Save the current schedule """ self.persist() if fire_event: # Fire the complete event back along with the list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True}, tag="/salt/minion/minion_schedule_saved" ) def postpone_job(self, name, data, fire_event=True): """ Postpone a job in the scheduler. Ignores jobs from pillar """ time = data["time"] new_time = data["new_time"] time_fmt = data.get("time_fmt", "%Y-%m-%dT%H:%M:%S") # ensure job exists, then disable it if name in self.opts["schedule"]: if "skip_explicit" not in self.opts["schedule"][name]: self.opts["schedule"][name]["skip_explicit"] = [] self.opts["schedule"][name]["skip_explicit"].append( {"time": time, "time_fmt": time_fmt} ) if "run_explicit" not in self.opts["schedule"][name]: self.opts["schedule"][name]["run_explicit"] = [] self.opts["schedule"][name]["run_explicit"].append( {"time": new_time, "time_fmt": time_fmt} ) elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_postpone_job_complete", ) def skip_job(self, name, data, fire_event=True): """ Skip a job at a specific time in the scheduler. Ignores jobs from pillar """ time = data["time"] time_fmt = data.get("time_fmt", "%Y-%m-%dT%H:%M:%S") # ensure job exists, then disable it if name in self.opts["schedule"]: if "skip_explicit" not in self.opts["schedule"][name]: self.opts["schedule"][name]["skip_explicit"] = [] self.opts["schedule"][name]["skip_explicit"].append( {"time": time, "time_fmt": time_fmt} ) elif name in self._get_schedule(include_opts=False): log.warning("Cannot modify job %s, it's in the pillar!", name) if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "schedule": self._get_schedule()}, tag="/salt/minion/minion_schedule_skip_job_complete", ) def get_next_fire_time(self, name, fmt="%Y-%m-%dT%H:%M:%S", fire_event=True): """ Return the next fire time for the specified job """ schedule = self._get_schedule() _next_fire_time = None if schedule: _next_fire_time = schedule.get(name, {}).get("_next_fire_time", None) if _next_fire_time: _next_fire_time = _next_fire_time.strftime(fmt) if fire_event: # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "next_fire_time": _next_fire_time}, tag="/salt/minion/minion_schedule_next_fire_time_complete", ) def job_status(self, name, fire_event=False): """ Return the specified schedule item """ if fire_event: schedule = self._get_schedule() data = schedule.get(name, {}) # Fire the complete event back along with updated list of schedule with salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) as evt: evt.fire_event( {"complete": True, "data": data}, tag="/salt/minion/minion_schedule_job_status_complete", ) else: schedule = self._get_schedule() return schedule.get(name, {}) def handle_func(self, multiprocessing_enabled, func, data, jid=None): """ Execute this method in a multiprocess or thread """ if ( salt.utils.platform.spawning_platform() or self.opts.get("transport") == "zeromq" ): # Since function references can't be pickled and pickling # is required when spawning new processes on spawning platforms, regenerate # the functions and returners. # This also needed for ZeroMQ transport to reset all functions # context data that could keep paretns connections. ZeroMQ will # hang on polling parents connections from the child process. self.utils = salt.loader.utils(self.opts) if self.opts["__role"] == "master": self.functions = salt.loader.runner(self.opts, utils=self.utils) else: self.functions = salt.loader.minion_mods( self.opts, proxy=self.proxy, utils=self.utils ) self.returners = salt.loader.returners( self.opts, self.functions, proxy=self.proxy ) if jid is None: jid = salt.utils.jid.gen_jid(self.opts) ret = { "id": self.opts.get("id", "master"), "fun": func, "fun_args": [], "schedule": data["name"], "jid": jid, } if "metadata" in data: if isinstance(data["metadata"], dict): ret["metadata"] = data["metadata"] ret["metadata"]["_TOS"] = self.time_offset ret["metadata"]["_TS"] = time.ctime() ret["metadata"]["_TT"] = time.strftime( "%Y %B %d %a %H %m", time.gmtime() ) else: log.warning( "schedule: The metadata parameter must be " "specified as a dictionary. Ignoring." ) data_returner = data.get("returner", None) if not self.standalone: proc_fn = os.path.join( salt.minion.get_proc_dir(self.opts["cachedir"]), ret["jid"] ) # TODO: Make it readable! Splt to funcs, remove nested try-except-finally sections. try: minion_blackout_violation = False if self.opts.get("pillar", {}).get("minion_blackout", False): whitelist = self.opts.get("pillar", {}).get( "minion_blackout_whitelist", [] ) # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist if func != "saltutil.refresh_pillar" and func not in whitelist: minion_blackout_violation = True elif self.opts.get("grains", {}).get("minion_blackout", False): whitelist = self.opts.get("grains", {}).get( "minion_blackout_whitelist", [] ) if func != "saltutil.refresh_pillar" and func not in whitelist: minion_blackout_violation = True if minion_blackout_violation: raise SaltInvocationError( "Minion in blackout mode. Set 'minion_blackout' " "to False in pillar or grains to resume operations. Only " "saltutil.refresh_pillar allowed in blackout mode." ) ret["pid"] = os.getpid() args = tuple() if "args" in data: args = copy.deepcopy(data["args"]) ret["fun_args"].extend(data["args"]) kwargs = {} if "kwargs" in data: kwargs = copy.deepcopy(data["kwargs"]) ret["fun_args"].append(copy.deepcopy(kwargs)) if func not in self.functions: ret["return"] = self.functions.missing_fun_string(func) salt.utils.error.raise_error( message=self.functions.missing_fun_string(func) ) if not self.standalone: if "jid_include" not in data or data["jid_include"]: log.debug( "schedule.handle_func: adding this job to the " "jobcache with data %s", ret, ) # write this to /var/cache/salt/minion/proc with salt.utils.files.fopen(proc_fn, "w+b") as fp_: fp_.write(salt.payload.dumps(ret)) # if the func support **kwargs, lets pack in the pub data we have # TODO: pack the *same* pub data as a minion? argspec = salt.utils.args.get_function_argspec(self.functions[func]) if argspec.keywords: # this function accepts **kwargs, pack in the publish data for key, val in ret.items(): if key != "kwargs": kwargs["__pub_{}".format(key)] = copy.deepcopy(val) # Only include these when running runner modules if self.opts["__role"] == "master": jid = salt.utils.jid.gen_jid(self.opts) tag = salt.utils.event.tagify(jid, prefix="salt/scheduler/") namespaced_event = salt.utils.event.NamespacedEvent( salt.utils.event.get_event( self.opts["__role"], self.opts["sock_dir"], opts=self.opts, listen=False, ), tag, print_func=None, ) func_globals = { "__jid__": jid, "__user__": salt.utils.user.get_user(), "__tag__": tag, "__jid_event__": weakref.proxy(namespaced_event), } self_functions = copy.copy(self.functions) salt.utils.lazy.verify_fun(self_functions, func) # Inject some useful globals to *all* the function's global # namespace only once per module-- not per func completed_funcs = [] for mod_name in self_functions.keys(): if "." not in mod_name: continue mod, _ = mod_name.split(".", 1) if mod in completed_funcs: continue completed_funcs.append(mod) for global_key, value in func_globals.items(): self.functions[mod_name].__globals__[global_key] = value self.functions.pack["__context__"]["retcode"] = 0 ret["return"] = self.functions[func](*args, **kwargs) if not self.standalone: # runners do not provide retcode if "retcode" in self.functions.pack["__context__"]: ret["retcode"] = self.functions.pack["__context__"]["retcode"] ret["success"] = True if data_returner or self.schedule_returner: if "return_config" in data: ret["ret_config"] = data["return_config"] if "return_kwargs" in data: ret["ret_kwargs"] = data["return_kwargs"] rets = [] for returner in [data_returner, self.schedule_returner]: if isinstance(returner, str): rets.append(returner) elif isinstance(returner, list): rets.extend(returner) # simple de-duplication with order retained for returner in OrderedDict.fromkeys(rets): ret_str = "{}.returner".format(returner) if ret_str in self.returners: self.returners[ret_str](ret) else: log.info( "Job %s using invalid returner: %s. Ignoring.", func, returner, ) except Exception: # pylint: disable=broad-except log.exception("Unhandled exception running %s", ret["fun"]) # Although catch-all exception handlers are bad, the exception here # is to let the exception bubble up to the top of the thread context, # where the thread will die silently, which is worse. if "return" not in ret: ret["return"] = "Unhandled exception running {}".format(ret["fun"]) ret["success"] = False ret["retcode"] = 254 finally: # Only attempt to return data to the master if the scheduled job is running # on a master itself or a minion. if "__role" in self.opts and self.opts["__role"] in ("master", "minion"): # The 'return_job' option is enabled by default even if not set if "return_job" in data and not data["return_job"]: pass else: # Send back to master so the job is included in the job list mret = ret.copy() # No returners defined, so we're only sending back to the master if not data_returner and not self.schedule_returner: mret["jid"] = "req" if data.get("return_job") == "nocache": # overwrite 'req' to signal to master that # this job shouldn't be stored mret["jid"] = "nocache" load = {"cmd": "_return", "id": self.opts["id"]} for key, value in mret.items(): load[key] = value if "__role" in self.opts and self.opts["__role"] == "minion": event = salt.utils.event.get_event( "minion", opts=self.opts, listen=False ) elif "__role" in self.opts and self.opts["__role"] == "master": event = salt.utils.event.get_master_event( self.opts, self.opts["sock_dir"] ) try: event.fire_event(load, "__schedule_return") except Exception as exc: # pylint: disable=broad-except log.exception( "Unhandled exception firing __schedule_return event" ) finally: event.destroy() if self.opts["__role"] == "master": namespaced_event.destroy() if not self.standalone: log.debug("schedule.handle_func: Removing %s", proc_fn) try: os.unlink(proc_fn) except OSError as exc: if exc.errno == errno.EEXIST or exc.errno == errno.ENOENT: # EEXIST and ENOENT are OK because the file is gone and that's what # we wanted pass else: log.error("Failed to delete '%s': %s", proc_fn, exc.errno) # Otherwise, failing to delete this file is not something # we can cleanly handle. raise finally: if multiprocessing_enabled: # Let's make sure we exit the process! sys.exit(salt.defaults.exitcodes.EX_GENERIC) def eval(self, now=None): """ Evaluate and execute the schedule :param datetime now: Override current time with a datetime object instance`` """ log.trace("==== evaluating schedule now %s =====", now) jids = [] loop_interval = self.opts["loop_interval"] if not isinstance(loop_interval, datetime.timedelta): loop_interval = datetime.timedelta(seconds=loop_interval) def _splay(splaytime): """ Calculate splaytime """ splay_ = None if isinstance(splaytime, dict): if splaytime["end"] >= splaytime["start"]: splay_ = random.randint(splaytime["start"], splaytime["end"]) else: log.error( "schedule.handle_func: Invalid Splay, " "end must be larger than start. Ignoring splay." ) else: splay_ = random.randint(1, splaytime) return splay_ def _handle_time_elements(data): """ Handle schedule item with time elements seconds, minutes, hours, days """ if "_seconds" not in data: interval = int(data.get("seconds", 0)) interval += int(data.get("minutes", 0)) * 60 interval += int(data.get("hours", 0)) * 3600 interval += int(data.get("days", 0)) * 86400 data["_seconds"] = interval if not data["_next_fire_time"]: data["_next_fire_time"] = now + datetime.timedelta( seconds=data["_seconds"] ) if interval < self.loop_interval: self.loop_interval = interval data["_next_scheduled_fire_time"] = now + datetime.timedelta( seconds=data["_seconds"] ) def _handle_once(data, loop_interval): """ Handle schedule item with once """ if data["_next_fire_time"]: if ( data["_next_fire_time"] < now - loop_interval or data["_next_fire_time"] > now and not data["_splay"] ): data["_continue"] = True if not data["_next_fire_time"] and not data["_splay"]: once = data["once"] if not isinstance(once, datetime.datetime): once_fmt = data.get("once_fmt", "%Y-%m-%dT%H:%M:%S") try: once = datetime.datetime.strptime(data["once"], once_fmt) except (TypeError, ValueError): data["_error"] = ( "Date string could not " "be parsed: {}, {}. " "Ignoring job {}.".format( data["once"], once_fmt, data["name"] ) ) log.error(data["_error"]) return data["_next_fire_time"] = once data["_next_scheduled_fire_time"] = once # If _next_fire_time is less than now, continue if once < now - loop_interval: data["_continue"] = True def _handle_when(data, loop_interval): """ Handle schedule item with when """ if not _WHEN_SUPPORTED: data["_error"] = "Missing python-dateutil. Ignoring job {}.".format( data["name"] ) log.error(data["_error"]) return if not isinstance(data["when"], list): _when_data = [data["when"]] else: _when_data = data["when"] _when = [] for i in _when_data: if ( "pillar" in self.opts and "whens" in self.opts["pillar"] and i in self.opts["pillar"]["whens"] ): if not isinstance(self.opts["pillar"]["whens"], dict): data["_error"] = ( 'Pillar item "whens" ' "must be a dict. " "Ignoring job {}.".format(data["name"]) ) log.error(data["_error"]) return when_ = self.opts["pillar"]["whens"][i] elif ( "grains" in self.opts and "whens" in self.opts["grains"] and i in self.opts["grains"]["whens"] ): if not isinstance(self.opts["grains"]["whens"], dict): data[ "_error" ] = 'Grain "whens" must be a dict. Ignoring job {}.'.format( data["name"] ) log.error(data["_error"]) return when_ = self.opts["grains"]["whens"][i] else: when_ = i if not isinstance(when_, datetime.datetime): try: when_ = dateutil_parser.parse(when_) except ValueError: data[ "_error" ] = "Invalid date string {}. Ignoring job {}.".format( i, data["name"] ) log.error(data["_error"]) return _when.append(when_) if data["_splay"]: _when.append(data["_splay"]) # Sort the list of "whens" from earlier to later schedules _when.sort() # Copy the list so we can loop through it for i in copy.deepcopy(_when): if len(_when) > 1: if i < now - loop_interval: # Remove all missed schedules except the latest one. # We need it to detect if it was triggered previously. _when.remove(i) if _when: # Grab the first element, which is the next run time or # last scheduled time in the past. when = _when[0] if ( when < now - loop_interval and not data.get("_run", False) and not run and not data["_splay"] ): data["_next_fire_time"] = None data["_continue"] = True return if "_run" not in data: # Prevent run of jobs from the past data["_run"] = bool(when >= now - loop_interval) if not data["_next_fire_time"]: data["_next_fire_time"] = when data["_next_scheduled_fire_time"] = when if data["_next_fire_time"] < when and not run and not data["_run"]: data["_next_fire_time"] = when data["_run"] = True elif not data.get("_run", False): data["_next_fire_time"] = None data["_continue"] = True def _handle_cron(data, loop_interval): """ Handle schedule item with cron """ if not _CRON_SUPPORTED: data["_error"] = "Missing python-croniter. Ignoring job {}.".format( data["name"] ) log.error(data["_error"]) return if data["_next_fire_time"] is None: # Get next time frame for a "cron" job if it has been never # executed before or already executed in the past. try: data["_next_fire_time"] = croniter.croniter( data["cron"], now ).get_next(datetime.datetime) data["_next_scheduled_fire_time"] = croniter.croniter( data["cron"], now ).get_next(datetime.datetime) except (ValueError, KeyError): data["_error"] = "Invalid cron string. Ignoring job {}.".format( data["name"] ) log.error(data["_error"]) return # If next job run is scheduled more than 1 minute ahead and # configured loop interval is longer than that, we should # shorten it to get our job executed closer to the beginning # of desired time. interval = (now - data["_next_fire_time"]).total_seconds() if interval >= 60 and interval < self.loop_interval: self.loop_interval = interval def _handle_run_explicit(data, loop_interval): """ Handle schedule item with run_explicit """ _run_explicit = [] for _run_time in data["run_explicit"]: if isinstance(_run_time, datetime.datetime): _run_explicit.append(_run_time) else: _run_explicit.append( datetime.datetime.strptime( _run_time["time"], _run_time["time_fmt"] ) ) data["run"] = False # Copy the list so we can loop through it for i in copy.deepcopy(_run_explicit): if len(_run_explicit) > 1: if i < now - loop_interval: _run_explicit.remove(i) if _run_explicit: if _run_explicit[0] <= now < _run_explicit[0] + loop_interval: data["run"] = True data["_next_fire_time"] = _run_explicit[0] def _handle_skip_explicit(data, loop_interval): """ Handle schedule item with skip_explicit """ data["run"] = False _skip_explicit = [] for _skip_time in data["skip_explicit"]: if isinstance(_skip_time, datetime.datetime): _skip_explicit.append(_skip_time) else: _skip_explicit.append( datetime.datetime.strptime( _skip_time["time"], _skip_time["time_fmt"] ) ) # Copy the list so we can loop through it for i in copy.deepcopy(_skip_explicit): if i < now - loop_interval: _skip_explicit.remove(i) if _skip_explicit: if _skip_explicit[0] <= now <= (_skip_explicit[0] + loop_interval): if self.skip_function: data["run"] = True data["func"] = self.skip_function else: data["_skip_reason"] = "skip_explicit" data["_skipped_time"] = now data["_skipped"] = True data["run"] = False else: data["run"] = True def _handle_skip_during_range(data, loop_interval): """ Handle schedule item with skip_explicit """ if not _RANGE_SUPPORTED: data["_error"] = "Missing python-dateutil. Ignoring job {}.".format( data["name"] ) log.error(data["_error"]) return if not isinstance(data["skip_during_range"], dict): data["_error"] = ( "schedule.handle_func: Invalid, range " "must be specified as a dictionary. " "Ignoring job {}.".format(data["name"]) ) log.error(data["_error"]) return start = data["skip_during_range"]["start"] end = data["skip_during_range"]["end"] if not isinstance(start, datetime.datetime): try: start = dateutil_parser.parse(start) except ValueError: data["_error"] = ( "Invalid date string for start in " "skip_during_range. Ignoring " "job {}.".format(data["name"]) ) log.error(data["_error"]) return if not isinstance(end, datetime.datetime): try: end = dateutil_parser.parse(end) except ValueError: data["_error"] = ( "Invalid date string for end in " "skip_during_range. Ignoring " "job {}.".format(data["name"]) ) log.error(data["_error"]) return # Check to see if we should run the job immediately # after the skip_during_range is over if "run_after_skip_range" in data and data["run_after_skip_range"]: if "run_explicit" not in data: data["run_explicit"] = [] # Add a run_explicit for immediately after the # skip_during_range ends _run_immediate = (end + loop_interval).strftime("%Y-%m-%dT%H:%M:%S") if _run_immediate not in data["run_explicit"]: data["run_explicit"].append( {"time": _run_immediate, "time_fmt": "%Y-%m-%dT%H:%M:%S"} ) if end > start: if start <= now <= end: if self.skip_function: data["run"] = True data["func"] = self.skip_function else: data["_skip_reason"] = "in_skip_range" data["_skipped_time"] = now data["_skipped"] = True data["run"] = False else: data["run"] = True else: data["_error"] = ( "schedule.handle_func: Invalid " "range, end must be larger than " "start. Ignoring job {}.".format(data["name"]) ) log.error(data["_error"]) def _handle_range(data): """ Handle schedule item with skip_explicit """ if not _RANGE_SUPPORTED: data["_error"] = "Missing python-dateutil. Ignoring job {}".format( data["name"] ) log.error(data["_error"]) return if not isinstance(data["range"], dict): data["_error"] = ( "schedule.handle_func: Invalid, range " "must be specified as a dictionary." "Ignoring job {}.".format(data["name"]) ) log.error(data["_error"]) return start = data["range"]["start"] end = data["range"]["end"] if not isinstance(start, datetime.datetime): try: start = dateutil_parser.parse(start) except ValueError: data[ "_error" ] = "Invalid date string for start. Ignoring job {}.".format( data["name"] ) log.error(data["_error"]) return if not isinstance(end, datetime.datetime): try: end = dateutil_parser.parse(end) except ValueError: data[ "_error" ] = "Invalid date string for end. Ignoring job {}.".format( data["name"] ) log.error(data["_error"]) return if end > start: if "invert" in data["range"] and data["range"]["invert"]: if now <= start or now >= end: data["run"] = True else: data["_skip_reason"] = "in_skip_range" data["run"] = False else: if start <= now <= end: data["run"] = True else: if self.skip_function: data["run"] = True data["func"] = self.skip_function else: data["_skip_reason"] = "not_in_range" data["run"] = False else: data["_error"] = ( "schedule.handle_func: Invalid " "range, end must be larger " "than start. Ignoring job {}.".format(data["name"]) ) log.error(data["_error"]) def _handle_after(data): """ Handle schedule item with after """ if not _WHEN_SUPPORTED: data["_error"] = "Missing python-dateutil. Ignoring job {}".format( data["name"] ) log.error(data["_error"]) return after = data["after"] if not isinstance(after, datetime.datetime): after = dateutil_parser.parse(after) if after >= now: log.debug("After time has not passed skipping job: %s.", data["name"]) data["_skip_reason"] = "after_not_passed" data["_skipped_time"] = now data["_skipped"] = True data["run"] = False else: data["run"] = True def _handle_until(data): """ Handle schedule item with until """ if not _WHEN_SUPPORTED: data["_error"] = "Missing python-dateutil. Ignoring job {}".format( data["name"] ) log.error(data["_error"]) return until = data["until"] if not isinstance(until, datetime.datetime): until = dateutil_parser.parse(until) if until <= now: log.debug("Until time has passed skipping job: %s.", data["name"]) data["_skip_reason"] = "until_passed" data["_skipped_time"] = now data["_skipped"] = True data["run"] = False else: data["run"] = True def _chop_ms(dt): """ Remove the microseconds from a datetime object """ return dt - datetime.timedelta(microseconds=dt.microsecond) schedule = self._get_schedule() if not isinstance(schedule, dict): raise ValueError("Schedule must be of type dict.") if "skip_function" in schedule: self.skip_function = schedule["skip_function"] if "skip_during_range" in schedule: self.skip_during_range = schedule["skip_during_range"] if "enabled" in schedule: self.enabled = schedule["enabled"] if "splay" in schedule: self.splay = schedule["splay"] _hidden = ["enabled", "skip_function", "skip_during_range", "splay"] for job, data in schedule.items(): # Skip anything that is a global setting if job in _hidden: continue # Clear these out between runs for item in [ "_continue", "_error", "_enabled", "_skipped", "_skip_reason", "_skipped_time", ]: if item in data: del data[item] run = False if "name" in data: job_name = data["name"] else: job_name = data["name"] = job if not isinstance(data, dict): log.error( 'Scheduled job "%s" should have a dict value, not %s', job_name, type(data), ) continue if "function" in data: func = data["function"] elif "func" in data: func = data["func"] elif "fun" in data: func = data["fun"] else: func = None if func not in self.functions: log.info("Invalid function: %s in scheduled job %s.", func, job_name) if "_next_fire_time" not in data: data["_next_fire_time"] = None if "_splay" not in data: data["_splay"] = None if ( "run_on_start" in data and data["run_on_start"] and "_run_on_start" not in data ): data["_run_on_start"] = True if not now: now = datetime.datetime.now() # Used for quick lookups when detecting invalid option # combinations. schedule_keys = set(data.keys()) time_elements = ("seconds", "minutes", "hours", "days") scheduling_elements = ("when", "cron", "once") invalid_sched_combos = [ set(i) for i in itertools.combinations(scheduling_elements, 2) ] if any(i <= schedule_keys for i in invalid_sched_combos): log.error( 'Unable to use "%s" options together. Ignoring.', '", "'.join(scheduling_elements), ) continue invalid_time_combos = [] for item in scheduling_elements: all_items = itertools.chain([item], time_elements) invalid_time_combos.append(set(itertools.combinations(all_items, 2))) if any(set(x) <= schedule_keys for x in invalid_time_combos): log.error( 'Unable to use "%s" with "%s" options. Ignoring', '", "'.join(time_elements), '", "'.join(scheduling_elements), ) continue if "run_explicit" in data: _handle_run_explicit(data, loop_interval) run = data["run"] if True in [True for item in time_elements if item in data]: _handle_time_elements(data) elif "once" in data: _handle_once(data, loop_interval) elif "when" in data: _handle_when(data, loop_interval) elif "cron" in data: _handle_cron(data, loop_interval) else: continue # Something told us to continue, so we continue if "_continue" in data and data["_continue"]: continue # An error occurred so we bail out if "_error" in data and data["_error"]: continue seconds = int( (_chop_ms(data["_next_fire_time"]) - _chop_ms(now)).total_seconds() ) # If there is no job specific splay available, # grab the global which defaults to None. if "splay" not in data: data["splay"] = self.splay if "splay" in data and data["splay"]: # Got "splay" configured, make decision to run a job based on that if not data["_splay"]: # Try to add "splay" time only if next job fire time is # still in the future. We should trigger job run # immediately otherwise. splay = _splay(data["splay"]) if now < data["_next_fire_time"] + datetime.timedelta( seconds=splay ): log.debug( "schedule.handle_func: Adding splay of " "%s seconds to next run.", splay, ) data["_splay"] = data["_next_fire_time"] + datetime.timedelta( seconds=splay ) if "when" in data: data["_run"] = True else: run = True if data["_splay"]: # The "splay" configuration has been already processed, just use it seconds = (data["_splay"] - now).total_seconds() if "when" in data: data["_next_fire_time"] = data["_splay"] if "_seconds" in data: if seconds <= 0: run = True elif "when" in data and data["_run"]: if ( data["_next_fire_time"] <= now <= (data["_next_fire_time"] + loop_interval) ): data["_run"] = False run = True elif "cron" in data: # Reset next scheduled time because it is in the past now, # and we should trigger the job run, then wait for the next one. if seconds <= 0: data["_next_fire_time"] = None run = True elif "once" in data: if ( data["_next_fire_time"] <= now <= (data["_next_fire_time"] + loop_interval) ): run = True elif seconds == 0: run = True if "_run_on_start" in data and data["_run_on_start"]: run = True data["_run_on_start"] = False elif run: if "range" in data: _handle_range(data) # An error occurred so we bail out if "_error" in data and data["_error"]: continue run = data["run"] # Override the functiton if passed back if "func" in data: func = data["func"] # If there is no job specific skip_during_range available, # grab the global which defaults to None. if "skip_during_range" not in data and self.skip_during_range: data["skip_during_range"] = self.skip_during_range if "skip_during_range" in data and data["skip_during_range"]: _handle_skip_during_range(data, loop_interval) # An error occurred so we bail out if "_error" in data and data["_error"]: continue run = data["run"] # Override the functiton if passed back if "func" in data: func = data["func"] if "skip_explicit" in data: _handle_skip_explicit(data, loop_interval) # An error occurred so we bail out if "_error" in data and data["_error"]: continue run = data["run"] # Override the functiton if passed back if "func" in data: func = data["func"] if "until" in data: _handle_until(data) # An error occurred so we bail out if "_error" in data and data["_error"]: continue run = data["run"] if "after" in data: _handle_after(data) # An error occurred so we bail out if "_error" in data and data["_error"]: continue run = data["run"] # If the job item has continue, then we set run to False # so the job does not run but we still get the important # information calculated, eg. _next_fire_time if "_continue" in data and data["_continue"]: run = False # If globally disabled or job # is diabled skip the job if not self.enabled or not data.get("enabled", True): log.trace("Job: %s is disabled", job_name) data["_skip_reason"] = "disabled" data["_skipped_time"] = now data["_skipped"] = True run = False miss_msg = "" if seconds < 0: miss_msg = " (runtime missed by {} seconds)".format(abs(seconds)) try: if run: if "jid_include" not in data or data["jid_include"]: data["jid_include"] = True log.debug( "schedule: Job %s was scheduled with jid_include, " "adding to cache (jid_include defaults to True)", job_name, ) if "maxrunning" in data: log.debug( "schedule: Job %s was scheduled with a max " "number of %s", job_name, data["maxrunning"], ) else: log.info( "schedule: maxrunning parameter was not specified for " "job %s, defaulting to 1.", job_name, ) data["maxrunning"] = 1 if not self.standalone: data["run"] = run data = self._check_max_running(func, data, self.opts, now) run = data["run"] # Check run again, just in case _check_max_running # set run to False if run: jid = salt.utils.jid.gen_jid(self.opts) jids.append(jid) log.info( "Running scheduled job: %s%s with jid %s", job_name, miss_msg, jid, ) self._run_job(func, data, jid=jid) finally: # Only set _last_run if the job ran if run: data["_last_run"] = now data["_splay"] = None if "_seconds" in data: if self.standalone: data["_next_fire_time"] = now + datetime.timedelta( seconds=data["_seconds"] ) elif "_skipped" in data and data["_skipped"]: if data["_next_fire_time"] <= now: data["_next_fire_time"] = now + datetime.timedelta( seconds=data["_seconds"] ) elif run: data["_next_fire_time"] = now + datetime.timedelta( seconds=data["_seconds"] ) return jids def _run_job(self, func, data, jid=None): job_dry_run = data.get("dry_run", False) if job_dry_run: log.debug("Job %s has 'dry_run' set to True. Not running it.", data["name"]) return multiprocessing_enabled = self.opts.get("multiprocessing", True) run_schedule_jobs_in_background = self.opts.get( "run_schedule_jobs_in_background", True ) if run_schedule_jobs_in_background is False: # Explicitly pass False for multiprocessing_enabled self.handle_func(False, func, data, jid) return if multiprocessing_enabled and salt.utils.platform.spawning_platform(): # Temporarily stash our function references. # You can't pickle function references, and pickling is # required when spawning new processes on spawning platforms. functions = self.functions self.functions = {} returners = self.returners self.returners = {} utils = self.utils self.utils = {} try: if multiprocessing_enabled: thread_cls = salt.utils.process.SignalHandlingProcess else: thread_cls = threading.Thread name = "Schedule(name={}, jid={})".format(data["name"], jid) if multiprocessing_enabled: with salt.utils.process.default_signals(signal.SIGINT, signal.SIGTERM): # Reset current signals before starting the process in # order not to inherit the current signal handlers proc = thread_cls( target=self.handle_func, args=(multiprocessing_enabled, func, data, jid), name=name, ) proc.start() self._subprocess_list.add(proc) else: proc = thread_cls( target=self.handle_func, args=(multiprocessing_enabled, func, data, jid), name=name, ) proc.start() self._subprocess_list.add(proc) finally: if multiprocessing_enabled and salt.utils.platform.spawning_platform(): # Restore our function references. self.functions = functions self.returners = returners self.utils = utils def cleanup_subprocesses(self): self._subprocess_list.cleanup() def clean_proc_dir(opts): """ Loop through jid files in the minion proc directory (default /var/cache/salt/minion/proc) and remove any that refer to processes that no longer exist """ for basefilename in os.listdir(salt.minion.get_proc_dir(opts["cachedir"])): fn_ = os.path.join(salt.minion.get_proc_dir(opts["cachedir"]), basefilename) with salt.utils.files.fopen(fn_, "rb") as fp_: job = None try: job = salt.payload.load(fp_) except Exception: # pylint: disable=broad-except # It's corrupted # Windows cannot delete an open file if salt.utils.platform.is_windows(): fp_.close() try: os.unlink(fn_) continue except OSError: continue log.debug( "schedule.clean_proc_dir: checking job %s for process existence", job ) if job is not None and "pid" in job: if salt.utils.process.os_is_running(job["pid"]): log.debug( "schedule.clean_proc_dir: Cleaning proc dir, pid %s " "still exists.", job["pid"], ) else: # Windows cannot delete an open file if salt.utils.platform.is_windows(): fp_.close() # Maybe the file is already gone try: os.unlink(fn_) except OSError: pass
Save