golden hour
/opt/saltstack/salt/lib/python3.10/site-packages/salt/utils
⬆️ Go Up
Upload
File/Folder
Size
Actions
__init__.py
237 B
Del
OK
__pycache__
-
Del
OK
aggregation.py
5.17 KB
Del
OK
ansible.py
1.48 KB
Del
OK
args.py
18.33 KB
Del
OK
asynchronous.py
4.06 KB
Del
OK
atomicfile.py
5.33 KB
Del
OK
aws.py
20.37 KB
Del
OK
azurearm.py
11.42 KB
Del
OK
beacons.py
517 B
Del
OK
boto3mod.py
8.36 KB
Del
OK
boto_elb_tag.py
3.02 KB
Del
OK
botomod.py
7.98 KB
Del
OK
cache.py
11.49 KB
Del
OK
channel.py
489 B
Del
OK
cloud.py
116.3 KB
Del
OK
color.py
2.72 KB
Del
OK
compat.py
1.89 KB
Del
OK
configcomparer.py
3.88 KB
Del
OK
configparser.py
10.82 KB
Del
OK
context.py
6.8 KB
Del
OK
crypt.py
5 KB
Del
OK
ctx.py
1.42 KB
Del
OK
data.py
53.03 KB
Del
OK
dateutils.py
2.3 KB
Del
OK
debug.py
4.19 KB
Del
OK
decorators
-
Del
OK
dictdiffer.py
16.48 KB
Del
OK
dicttrim.py
3.9 KB
Del
OK
dictupdate.py
11.33 KB
Del
OK
dns.py
35.21 KB
Del
OK
doc.py
2.25 KB
Del
OK
dockermod
-
Del
OK
entrypoints.py
1.83 KB
Del
OK
environment.py
2.2 KB
Del
OK
error.py
1.18 KB
Del
OK
etcd_util.py
33.25 KB
Del
OK
event.py
52.45 KB
Del
OK
extend.py
8.87 KB
Del
OK
extmods.py
6.04 KB
Del
OK
filebuffer.py
3.15 KB
Del
OK
files.py
27.94 KB
Del
OK
find.py
22.08 KB
Del
OK
fsutils.py
3.29 KB
Del
OK
functools.py
6.02 KB
Del
OK
gitfs.py
130.41 KB
Del
OK
github.py
1.52 KB
Del
OK
gzip_util.py
2.86 KB
Del
OK
hashutils.py
5.91 KB
Del
OK
http.py
33.9 KB
Del
OK
iam.py
1.22 KB
Del
OK
icinga2.py
754 B
Del
OK
idem.py
1.22 KB
Del
OK
immutabletypes.py
2.46 KB
Del
OK
itertools.py
2.36 KB
Del
OK
jid.py
3 KB
Del
OK
jinja.py
33.92 KB
Del
OK
job.py
6.89 KB
Del
OK
json.py
3.78 KB
Del
OK
kickstart.py
41.04 KB
Del
OK
kinds.py
493 B
Del
OK
lazy.py
3.06 KB
Del
OK
listdiffer.py
10.9 KB
Del
OK
locales.py
2.06 KB
Del
OK
mac_utils.py
14.01 KB
Del
OK
mako.py
3.97 KB
Del
OK
master.py
29.7 KB
Del
OK
mattermost.py
1.77 KB
Del
OK
memcached.py
3.56 KB
Del
OK
migrations.py
1.46 KB
Del
OK
mine.py
3.68 KB
Del
OK
minion.py
4.13 KB
Del
OK
minions.py
43.38 KB
Del
OK
mount.py
1.15 KB
Del
OK
msazure.py
5.36 KB
Del
OK
msgpack.py
4.69 KB
Del
OK
nacl.py
13.65 KB
Del
OK
namecheap.py
4.32 KB
Del
OK
napalm.py
23.22 KB
Del
OK
nb_popen.py
7.24 KB
Del
OK
network.py
73.86 KB
Del
OK
nxos.py
12.94 KB
Del
OK
nxos_api.py
4 KB
Del
OK
odict.py
13.21 KB
Del
OK
openstack
-
Del
OK
oset.py
6.41 KB
Del
OK
pagerduty.py
3.03 KB
Del
OK
parsers.py
122.69 KB
Del
OK
path.py
11.24 KB
Del
OK
pbm.py
9.81 KB
Del
OK
pkg
-
Del
OK
platform.py
5.84 KB
Del
OK
powershell.py
4.15 KB
Del
OK
preseed.py
2.64 KB
Del
OK
process.py
40.76 KB
Del
OK
profile.py
3.21 KB
Del
OK
proxy.py
331 B
Del
OK
psutil_compat.py
3.63 KB
Del
OK
pushover.py
4.51 KB
Del
OK
pycrypto.py
5.41 KB
Del
OK
pydsl.py
13.74 KB
Del
OK
pyobjects.py
10.75 KB
Del
OK
reactor.py
18.99 KB
Del
OK
reclass.py
752 B
Del
OK
roster_matcher.py
3.55 KB
Del
OK
rsax931.py
8.42 KB
Del
OK
s3.py
8.78 KB
Del
OK
saltclass.py
14.27 KB
Del
OK
sanitizers.py
2.51 KB
Del
OK
schedule.py
71.81 KB
Del
OK
schema.py
54.26 KB
Del
OK
sdb.py
4.04 KB
Del
OK
slack.py
3.58 KB
Del
OK
smb.py
11.16 KB
Del
OK
smtp.py
3.27 KB
Del
OK
ssdp.py
14.75 KB
Del
OK
ssh.py
769 B
Del
OK
state.py
8.43 KB
Del
OK
stringio.py
355 B
Del
OK
stringutils.py
16.95 KB
Del
OK
systemd.py
5.51 KB
Del
OK
templates.py
24.03 KB
Del
OK
textformat.py
5.03 KB
Del
OK
thin.py
31.91 KB
Del
OK
timed_subprocess.py
4.06 KB
Del
OK
timeout.py
1.53 KB
Del
OK
timeutil.py
2.4 KB
Del
OK
url.py
5 KB
Del
OK
user.py
11.86 KB
Del
OK
validate
-
Del
OK
value.py
247 B
Del
OK
vault.py
21.74 KB
Del
OK
verify.py
25.34 KB
Del
OK
versions.py
17.17 KB
Del
OK
virt.py
3.24 KB
Del
OK
virtualbox.py
22.43 KB
Del
OK
vmware.py
129.74 KB
Del
OK
vsan.py
17.18 KB
Del
OK
vt.py
31.47 KB
Del
OK
vt_helper.py
4.4 KB
Del
OK
win_chcp.py
3.7 KB
Del
OK
win_dacl.py
95.49 KB
Del
OK
win_dotnet.py
4.74 KB
Del
OK
win_functions.py
12.69 KB
Del
OK
win_lgpo_auditpol.py
8.48 KB
Del
OK
win_lgpo_netsh.py
17.87 KB
Del
OK
win_lgpo_reg.py
16.98 KB
Del
OK
win_network.py
16.35 KB
Del
OK
win_osinfo.py
2.83 KB
Del
OK
win_pdh.py
13.85 KB
Del
OK
win_reg.py
30.82 KB
Del
OK
win_runas.py
10.53 KB
Del
OK
win_service.py
5.2 KB
Del
OK
win_system.py
14.47 KB
Del
OK
win_update.py
40.36 KB
Del
OK
winapi.py
818 B
Del
OK
x509.py
73.19 KB
Del
OK
xdg.py
316 B
Del
OK
xmlutil.py
13.91 KB
Del
OK
yaml.py
349 B
Del
OK
yamldumper.py
3.37 KB
Del
OK
yamlencoding.py
1.55 KB
Del
OK
yamllint.py
1.61 KB
Del
OK
yamlloader.py
6.04 KB
Del
OK
yamlloader_old.py
8.15 KB
Del
OK
yast.py
619 B
Del
OK
zeromq.py
1.74 KB
Del
OK
zfs.py
19.15 KB
Del
OK
Edit: reactor.py
""" Functions which implement running reactor jobs """ import fnmatch import glob import logging import os import salt.client import salt.defaults.exitcodes import salt.runner import salt.state import salt.utils.args import salt.utils.cache import salt.utils.data import salt.utils.event import salt.utils.files import salt.utils.master import salt.utils.process import salt.utils.yaml import salt.wheel log = logging.getLogger(__name__) REACTOR_INTERNAL_KEYWORDS = frozenset( ["__id__", "__sls__", "name", "order", "fun", "key", "state"] ) class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): """ Read in the reactor configuration variable and compare it to events processed on the master. The reactor has the capability to execute pre-programmed executions as reactions to events """ aliases = { "cmd": "local", } def __init__(self, opts, **kwargs): super().__init__(**kwargs) local_minion_opts = opts.copy() local_minion_opts["file_client"] = "local" self.minion = salt.minion.MasterMinion(local_minion_opts) salt.state.Compiler.__init__(self, opts, self.minion.rend) self.is_leader = True def render_reaction(self, glob_ref, tag, data): """ Execute the render system against a single reaction file and return the data structure """ react = {} if glob_ref.startswith("salt://"): glob_ref = self.minion.functions["cp.cache_file"](glob_ref) or "" globbed_ref = glob.glob(glob_ref) if not globbed_ref: log.error( "Can not render SLS %s for tag %s. File missing or not found.", glob_ref, tag, ) for fn_ in globbed_ref: try: res = self.render_template(fn_, tag=tag, data=data) # for #20841, inject the sls name here since verify_high() # assumes it exists in case there are any errors for name in res: res[name]["__sls__"] = fn_ react.update(res) except Exception: # pylint: disable=broad-except log.exception('Failed to render "%s": ', fn_) return react def list_reactors(self, tag): """ Take in the tag from an event and return a list of the reactors to process """ log.debug("Gathering reactors for tag %s", tag) reactors = [] if isinstance(self.opts["reactor"], str): try: with salt.utils.files.fopen(self.opts["reactor"]) as fp_: react_map = salt.utils.yaml.safe_load(fp_) except OSError: log.error('Failed to read reactor map: "%s"', self.opts["reactor"]) except Exception: # pylint: disable=broad-except log.error( 'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"] ) else: react_map = self.opts["reactor"] for ropt in react_map: if not isinstance(ropt, dict): continue if len(ropt) != 1: continue key = next(iter(ropt.keys())) val = ropt[key] if fnmatch.fnmatch(tag, key): if isinstance(val, str): reactors.append(val) elif isinstance(val, list): reactors.extend(val) return reactors def list_all(self): """ Return a list of the reactors """ if isinstance(self.minion.opts["reactor"], str): log.debug("Reading reactors from yaml %s", self.opts["reactor"]) try: with salt.utils.files.fopen(self.opts["reactor"]) as fp_: react_map = salt.utils.yaml.safe_load(fp_) except OSError: log.error('Failed to read reactor map: "%s"', self.opts["reactor"]) except Exception: # pylint: disable=broad-except log.error( 'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"] ) else: log.debug("Not reading reactors from yaml") react_map = self.minion.opts["reactor"] return react_map def add_reactor(self, tag, reaction): """ Add a reactor """ reactors = self.list_all() for reactor in reactors: _tag = next(iter(reactor.keys())) if _tag == tag: return {"status": False, "comment": "Reactor already exists."} self.minion.opts["reactor"].append({tag: reaction}) return {"status": True, "comment": "Reactor added."} def delete_reactor(self, tag): """ Delete a reactor """ reactors = self.list_all() for reactor in reactors: _tag = next(iter(reactor.keys())) if _tag == tag: self.minion.opts["reactor"].remove(reactor) return {"status": True, "comment": "Reactor deleted."} return {"status": False, "comment": "Reactor does not exists."} def resolve_aliases(self, chunks): """ Preserve backward compatibility by rewriting the 'state' key in the low chunks if it is using a legacy type. """ for idx, _ in enumerate(chunks): new_state = self.aliases.get(chunks[idx]["state"]) if new_state is not None: chunks[idx]["state"] = new_state def reactions(self, tag, data, reactors): """ Render a list of reactor files and returns a reaction struct """ log.debug("Compiling reactions for tag %s", tag) high = {} chunks = [] try: for fn_ in reactors: high.update(self.render_reaction(fn_, tag, data)) if high: errors = self.verify_high(high) if errors: log.error( "Unable to render reactions for event %s due to " "errors (%s) in one or more of the sls files (%s)", tag, errors, reactors, ) return [] # We'll return nothing since there was an error chunks = self.order_chunks(self.compile_high_data(high)) except Exception as exc: # pylint: disable=broad-except log.exception("Exception encountered while compiling reactions") self.resolve_aliases(chunks) return chunks def call_reactions(self, chunks): """ Execute the reaction state """ for chunk in chunks: self.wrap.run(chunk) def run(self): """ Enter into the server loop """ if self.opts["reactor_niceness"] and not salt.utils.platform.is_windows(): log.info("Reactor setting niceness to %i", self.opts["reactor_niceness"]) os.nice(self.opts["reactor_niceness"]) # instantiate some classes inside our new process with salt.utils.event.get_event( self.opts["__role"], self.opts["sock_dir"], opts=self.opts, listen=True, ) as event: self.wrap = ReactWrap(self.opts) for data in event.iter_events(full=True): # skip all events fired by ourselves if data["data"].get("user") == self.wrap.event_user: continue # NOTE: these events must contain the masters key in order to be accepted # see salt.runners.reactor for the requesting interface if "salt/reactors/manage" in data["tag"]: master_key = salt.utils.master.get_master_key("root", self.opts) if data["data"].get("key") != master_key: log.error( "received salt/reactors/manage event without matching" " master_key. discarding" ) continue if data["tag"].endswith("salt/reactors/manage/is_leader"): event.fire_event( {"result": self.is_leader}, "salt/reactors/manage/leader/value" ) if data["tag"].endswith("salt/reactors/manage/set_leader"): # we only want to register events from the local master if data["data"].get("id") == self.opts["id"]: self.is_leader = data["data"]["value"] event.fire_event( {"result": self.is_leader}, "salt/reactors/manage/leader/value" ) if data["tag"].endswith("salt/reactors/manage/add"): _data = data["data"] res = self.add_reactor(_data["event"], _data["reactors"]) event.fire_event( {"reactors": self.list_all(), "result": res}, "salt/reactors/manage/add-complete", ) elif data["tag"].endswith("salt/reactors/manage/delete"): _data = data["data"] res = self.delete_reactor(_data["event"]) event.fire_event( {"reactors": self.list_all(), "result": res}, "salt/reactors/manage/delete-complete", ) elif data["tag"].endswith("salt/reactors/manage/list"): event.fire_event( {"reactors": self.list_all()}, "salt/reactors/manage/list-results", ) else: # do not handle any reactions if not leader in cluster if not self.is_leader: continue else: reactors = self.list_reactors(data["tag"]) if not reactors: continue chunks = self.reactions(data["tag"], data["data"], reactors) if chunks: try: self.call_reactions(chunks) except SystemExit: log.warning("Exit ignored by reactor") class ReactWrap: """ Wrapper that executes low data for the Reactor System """ # class-wide cache of clients client_cache = None event_user = "Reactor" reaction_class = { "local": salt.client.LocalClient, "runner": salt.runner.RunnerClient, "wheel": salt.wheel.Wheel, "caller": salt.client.Caller, } def __init__(self, opts): self.opts = opts if ReactWrap.client_cache is None: ReactWrap.client_cache = salt.utils.cache.CacheDict( opts["reactor_refresh_interval"] ) self.pool = salt.utils.process.ThreadPool( self.opts["reactor_worker_threads"], # number of workers for runner/wheel queue_size=self.opts["reactor_worker_hwm"], # queue size for those workers ) def populate_client_cache(self, low): """ Populate the client cache with an instance of the specified type """ reaction_type = low["state"] if reaction_type not in self.client_cache: log.debug("Reactor is populating %s client cache", reaction_type) if reaction_type in ("runner", "wheel"): # Reaction types that run locally on the master want the full # opts passed. self.client_cache[reaction_type] = self.reaction_class[reaction_type]( self.opts ) # The len() function will cause the module functions to load if # they aren't already loaded. We want to load them so that the # spawned threads don't need to load them. Loading in the # spawned threads creates race conditions such as sometimes not # finding the required function because another thread is in # the middle of loading the functions. len(self.client_cache[reaction_type].functions) else: # Reactions which use remote pubs only need the conf file when # instantiating a client instance. self.client_cache[reaction_type] = self.reaction_class[reaction_type]( self.opts["conf_file"] ) def run(self, low): """ Execute a reaction by invoking the proper wrapper func """ self.populate_client_cache(low) try: l_fun = getattr(self, low["state"]) except AttributeError: log.error("ReactWrap is missing a wrapper function for '%s'", low["state"]) try: wrap_call = salt.utils.args.format_call(l_fun, low) args = wrap_call.get("args", ()) kwargs = wrap_call.get("kwargs", {}) # TODO: Setting user doesn't seem to work for actual remote pubs if low["state"] in ("runner", "wheel"): # Update called function's low data with event user to # segregate events fired by reactor and avoid reaction loops kwargs["__user__"] = self.event_user # Replace ``state`` kwarg which comes from high data compiler. # It breaks some runner functions and seems unnecessary. kwargs["__state__"] = kwargs.pop("state") # NOTE: if any additional keys are added here, they will also # need to be added to filter_kwargs() if "args" in kwargs: # New configuration reactor_args = kwargs.pop("args") for item in ("arg", "kwarg"): if item in low: log.warning( "Reactor '%s' is ignoring '%s' param %s due to " "presence of 'args' param. Check the Reactor System " "documentation for the correct argument format.", low["__id__"], item, low[item], ) if ( low["state"] == "caller" and isinstance(reactor_args, list) and not salt.utils.data.is_dictlist(reactor_args) ): # Legacy 'caller' reactors were already using the 'args' # param, but only supported a list of positional arguments. # If low['args'] is a list but is *not* a dictlist, then # this is actually using the legacy configuration. So, put # the reactor args into kwarg['arg'] so that the wrapper # interprets them as positional args. kwargs["arg"] = reactor_args kwargs["kwarg"] = {} else: kwargs["arg"] = () kwargs["kwarg"] = reactor_args if not isinstance(kwargs["kwarg"], dict): kwargs["kwarg"] = salt.utils.data.repack_dictlist(kwargs["kwarg"]) if not kwargs["kwarg"]: log.error( "Reactor '%s' failed to execute %s '%s': " "Incorrect argument format, check the Reactor System " "documentation for the correct format.", low["__id__"], low["state"], low["fun"], ) return else: # Legacy configuration react_call = {} if low["state"] in ("runner", "wheel"): if "arg" not in kwargs or "kwarg" not in kwargs: # Runner/wheel execute on the master, so we can use # format_call to get the functions args/kwargs react_fun = self.client_cache[low["state"]].functions.get( low["fun"] ) if react_fun is None: log.error( "Reactor '%s' failed to execute %s '%s': " "function not available", low["__id__"], low["state"], low["fun"], ) return react_call = salt.utils.args.format_call( react_fun, low, expected_extra_kws=REACTOR_INTERNAL_KEYWORDS ) if "arg" not in kwargs: kwargs["arg"] = react_call.get("args", ()) if "kwarg" not in kwargs: kwargs["kwarg"] = react_call.get("kwargs", {}) # Execute the wrapper with the proper args/kwargs. kwargs['arg'] # and kwargs['kwarg'] contain the positional and keyword arguments # that will be passed to the client interface to execute the # desired runner/wheel/remote-exec/etc. function. ret = l_fun(*args, **kwargs) if ret is False: log.error( "Reactor '%s' failed to execute %s '%s': " "TaskPool queue is full!" "Consider tuning reactor_worker_threads and/or" " reactor_worker_hwm", low["__id__"], low["state"], low["fun"], ) except SystemExit: log.warning("Reactor '%s' attempted to exit. Ignored.", low["__id__"]) except Exception: # pylint: disable=broad-except log.error( "Reactor '%s' failed to execute %s '%s'", low["__id__"], low["state"], low["fun"], exc_info=True, ) def runner(self, fun, **kwargs): """ Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>` """ return self.pool.fire_async(self.client_cache["runner"].low, args=(fun, kwargs)) def wheel(self, fun, **kwargs): """ Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>` """ return self.pool.fire_async(self.client_cache["wheel"].low, args=(fun, kwargs)) def local(self, fun, tgt, **kwargs): """ Wrap LocalClient for running :ref:`execution modules <all-salt.modules>` """ self.client_cache["local"].cmd_async(tgt, fun, **kwargs) def caller(self, fun, **kwargs): """ Wrap LocalCaller to execute remote exec functions locally on the Minion """ self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
Save