golden hour
/opt/saltstack/salt/lib/python3.10/site-packages/salt/transport
⬆️ Go Up
Upload
File/Folder
Size
Actions
__init__.py
507 B
Del
OK
__pycache__
-
Del
OK
base.py
7.79 KB
Del
OK
client.py
3.43 KB
Del
OK
frame.py
2.76 KB
Del
OK
ipc.py
27.66 KB
Del
OK
local.py
1.37 KB
Del
OK
server.py
1.22 KB
Del
OK
tcp.py
36.58 KB
Del
OK
zeromq.py
33.15 KB
Del
OK
Edit: local.py
import logging import salt.utils.files from salt.channel.client import ReqChannel log = logging.getLogger(__name__) class LocalChannel(ReqChannel): """ Local channel for testing purposes """ def __init__(self, opts, **kwargs): self.opts = opts self.kwargs = kwargs self.tries = 0 def close(self): """ Close the local channel. Currently a NOOP """ def send(self, load, tries=3, timeout=60, raw=False): if self.tries == 0: log.debug("LocalChannel load: %s", load) # data = json.loads(load) # {'path': 'apt-cacher-ng/map.jinja', 'saltenv': 'base', 'cmd': '_serve_file', 'loc': 0} # f = open(data['path']) with salt.utils.files.fopen(load["path"]) as f: ret = { "data": "".join(f.readlines()), "dest": load["path"], } print("returning", ret) else: # end of buffer ret = { "data": None, "dest": None, } self.tries = self.tries + 1 return ret def crypted_transfer_decode_dictentry( self, load, dictkey=None, tries=3, timeout=60 ): super().crypted_transfer_decode_dictentry( load, dictkey=dictkey, tries=tries, timeout=timeout )
Save