golden hour
/opt/cloudlinux/venv/lib/python3.11/site-packages/aiohttp
⬆️ Go Up
Upload
File/Folder
Size
Actions
.hash
-
Del
OK
__init__.py
7.58 KB
Del
OK
__pycache__
-
Del
OK
_cparser.pxd
4.22 KB
Del
OK
_find_header.pxd
68 B
Del
OK
_headers.pxi
1.96 KB
Del
OK
_helpers.cpython-311-x86_64-linux-gnu.so
563.5 KB
Del
OK
_helpers.pyi
202 B
Del
OK
_helpers.pyx
1.02 KB
Del
OK
_http_parser.cpython-311-x86_64-linux-gnu.so
2.66 MB
Del
OK
_http_parser.pyx
27.4 KB
Del
OK
_http_writer.cpython-311-x86_64-linux-gnu.so
477.98 KB
Del
OK
_http_writer.pyx
4.47 KB
Del
OK
_websocket.cpython-311-x86_64-linux-gnu.so
249.84 KB
Del
OK
_websocket.pyx
1.52 KB
Del
OK
abc.py
5.37 KB
Del
OK
base_protocol.py
2.68 KB
Del
OK
client.py
46.17 KB
Del
OK
client_exceptions.py
9.19 KB
Del
OK
client_proto.py
8.45 KB
Del
OK
client_reqrep.py
38.75 KB
Del
OK
client_ws.py
10.75 KB
Del
OK
compression_utils.py
4.9 KB
Del
OK
connector.py
51.56 KB
Del
OK
cookiejar.py
13.69 KB
Del
OK
formdata.py
5.96 KB
Del
OK
hdrs.py
4.5 KB
Del
OK
helpers.py
29.55 KB
Del
OK
http.py
1.8 KB
Del
OK
http_exceptions.py
2.65 KB
Del
OK
http_parser.py
34.66 KB
Del
OK
http_websocket.py
26.09 KB
Del
OK
http_writer.py
5.79 KB
Del
OK
locks.py
1.11 KB
Del
OK
log.py
325 B
Del
OK
multipart.py
31.71 KB
Del
OK
payload.py
13.22 KB
Del
OK
payload_streamer.py
2.04 KB
Del
OK
py.typed
7 B
Del
OK
pytest_plugin.py
11.33 KB
Del
OK
resolver.py
4.95 KB
Del
OK
streams.py
20.35 KB
Del
OK
tcp_helpers.py
961 B
Del
OK
test_utils.py
19.71 KB
Del
OK
tracing.py
14.78 KB
Del
OK
typedefs.py
1.44 KB
Del
OK
web.py
18.81 KB
Del
OK
web_app.py
17.88 KB
Del
OK
web_exceptions.py
10.12 KB
Del
OK
web_fileresponse.py
11.15 KB
Del
OK
web_log.py
7.62 KB
Del
OK
web_middlewares.py
3.94 KB
Del
OK
web_protocol.py
22.5 KB
Del
OK
web_request.py
28.08 KB
Del
OK
web_response.py
27.08 KB
Del
OK
web_routedef.py
5.99 KB
Del
OK
web_runner.py
11.46 KB
Del
OK
web_server.py
2.53 KB
Del
OK
web_urldispatcher.py
39.12 KB
Del
OK
web_ws.py
18.21 KB
Del
OK
worker.py
7.78 KB
Del
OK
Edit: compression_utils.py
import asyncio import zlib from concurrent.futures import Executor from typing import Optional, cast try: try: import brotlicffi as brotli except ImportError: import brotli HAS_BROTLI = True except ImportError: # pragma: no cover HAS_BROTLI = False MAX_SYNC_CHUNK_SIZE = 1024 def encoding_to_mode( encoding: Optional[str] = None, suppress_deflate_header: bool = False, ) -> int: if encoding == "gzip": return 16 + zlib.MAX_WBITS return -zlib.MAX_WBITS if suppress_deflate_header else zlib.MAX_WBITS class ZlibBaseHandler: def __init__( self, mode: int, executor: Optional[Executor] = None, max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, ): self._mode = mode self._executor = executor self._max_sync_chunk_size = max_sync_chunk_size class ZLibCompressor(ZlibBaseHandler): def __init__( self, encoding: Optional[str] = None, suppress_deflate_header: bool = False, level: Optional[int] = None, wbits: Optional[int] = None, strategy: int = zlib.Z_DEFAULT_STRATEGY, executor: Optional[Executor] = None, max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, ): super().__init__( mode=encoding_to_mode(encoding, suppress_deflate_header) if wbits is None else wbits, executor=executor, max_sync_chunk_size=max_sync_chunk_size, ) if level is None: self._compressor = zlib.compressobj(wbits=self._mode, strategy=strategy) else: self._compressor = zlib.compressobj( wbits=self._mode, strategy=strategy, level=level ) self._compress_lock = asyncio.Lock() def compress_sync(self, data: bytes) -> bytes: return self._compressor.compress(data) async def compress(self, data: bytes) -> bytes: async with self._compress_lock: # To ensure the stream is consistent in the event # there are multiple writers, we need to lock # the compressor so that only one writer can # compress at a time. if ( self._max_sync_chunk_size is not None and len(data) > self._max_sync_chunk_size ): return await asyncio.get_event_loop().run_in_executor( self._executor, self.compress_sync, data ) return self.compress_sync(data) def flush(self, mode: int = zlib.Z_FINISH) -> bytes: return self._compressor.flush(mode) class ZLibDecompressor(ZlibBaseHandler): def __init__( self, encoding: Optional[str] = None, suppress_deflate_header: bool = False, executor: Optional[Executor] = None, max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, ): super().__init__( mode=encoding_to_mode(encoding, suppress_deflate_header), executor=executor, max_sync_chunk_size=max_sync_chunk_size, ) self._decompressor = zlib.decompressobj(wbits=self._mode) def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes: return self._decompressor.decompress(data, max_length) async def decompress(self, data: bytes, max_length: int = 0) -> bytes: if ( self._max_sync_chunk_size is not None and len(data) > self._max_sync_chunk_size ): return await asyncio.get_event_loop().run_in_executor( self._executor, self.decompress_sync, data, max_length ) return self.decompress_sync(data, max_length) def flush(self, length: int = 0) -> bytes: return ( self._decompressor.flush(length) if length > 0 else self._decompressor.flush() ) @property def eof(self) -> bool: return self._decompressor.eof @property def unconsumed_tail(self) -> bytes: return self._decompressor.unconsumed_tail @property def unused_data(self) -> bytes: return self._decompressor.unused_data class BrotliDecompressor: # Supports both 'brotlipy' and 'Brotli' packages # since they share an import name. The top branches # are for 'brotlipy' and bottom branches for 'Brotli' def __init__(self) -> None: if not HAS_BROTLI: raise RuntimeError( "The brotli decompression is not available. " "Please install `Brotli` module" ) self._obj = brotli.Decompressor() def decompress_sync(self, data: bytes) -> bytes: if hasattr(self._obj, "decompress"): return cast(bytes, self._obj.decompress(data)) return cast(bytes, self._obj.process(data)) def flush(self) -> bytes: if hasattr(self._obj, "flush"): return cast(bytes, self._obj.flush()) return b""
Save