Coverage for CIResults/sandbox/io.py: 100%
134 statements
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 09:20 +0000
« prev ^ index » next coverage.py v7.6.9, created at 2024-12-19 09:20 +0000
1import os
2import sys
3import json
4import array
5import struct
6import subprocess
7from threading import RLock
8from subprocess import PIPE
9from types import ModuleType
11# Make sure the sandbox module is available
12sys.path.append(os.path.dirname(os.path.normpath(__file__)))
13from lockdown import LockDown # noqa
16class IOWrapper:
17 # Commands
18 EXEC_USER_SCRIPT = "exec_user_script"
19 CALL_USER_FUNCTION = "call_user_function"
21 def __init__(self, stream_in=None, stream_out=None):
22 self.stream_in = stream_in if stream_in else sys.stdin.buffer
23 self.stream_out = stream_out if stream_out else sys.stdout.buffer
25 def send(self, data):
26 if not isinstance(data, bytes):
27 data = str(data).encode()
28 length = len(data)
29 header = array.array('b', b'\0\0\0\0') # 32 bit length
30 struct.pack_into(">L", header, 0, length)
31 self.stream_out.write(header.tobytes() + data)
32 self.stream_out.flush()
34 def read(self):
35 try:
36 header = self.stream_in.read(4)
37 length = struct.unpack(">L", header)[0]
38 content = self.stream_in.read(length)
40 if len(content) != length:
41 raise IOError("The message read is shorter than expected")
43 return content.decode()
44 except struct.error:
45 raise IOError("Invalid message format")
48class Server:
49 def __init__(self, stream_in=None, stream_out=None, lockdown=True):
50 self.iowrapper = IOWrapper(stream_in=stream_in, stream_out=stream_out)
51 self.usr_module = None
53 # NOTE: The code is covered, but since we have to run it in a separate
54 # process, the coverage does not pick it up
55 if lockdown:
56 from seccomplite import ALLOW, EQ, Arg # pragma: no cover
58 ld = LockDown() # pragma: no cover
59 ld.add_rule(ALLOW, "read", Arg(0, EQ, self.iowrapper.stream_in.fileno())) # pragma: no cover
60 ld.add_rule(ALLOW, "write", Arg(0, EQ, self.iowrapper.stream_out.fileno())) # pragma: no cover
61 ld.add_rule(ALLOW, "write", Arg(0, EQ, sys.stderr.fileno())) # pragma: no cover
62 ld.start() # pragma: no cover
64 def serve_request(self):
65 rpc_call = json.loads(self.iowrapper.read())
66 try:
67 # NOTE: All the callabable methods are prefixed with rpc__
68 func = getattr(self, "rpc__" + rpc_call['method'])
69 ret = func(rpc_call['kwargs'], rpc_call['version'])
70 self.iowrapper.send(json.dumps(ret))
71 except AttributeError:
72 self.iowrapper.send(json.dumps({'return_code': 404}))
74 def serve_forever(self):
75 while True: # pragma: no cover
76 self.serve_request() # pragma: no cover
78 # ----- RPC methods (need to be prefixed with 'rpc__') -----
80 def rpc__exec_user_script(self, kwargs, version):
81 # remove the current user module to allow the users to send new scripts
82 # without restarting the service.
83 del self.usr_module
85 user_script = kwargs.get('script')
86 if user_script is None:
87 return {'return_code': 400, 'reason': "The 'script' parameter is missing"}
89 # Create a dynamic module
90 self.usr_module = ModuleType('usr_script')
91 try:
92 code = compile(user_script, "<user script>", 'exec')
93 exec(code, self.usr_module.__dict__)
94 except Exception as e:
95 return {'return_code': 400, 'reason': 'Compilation error: {}'.format(e)}
97 return {'return_code': 200}
99 def rpc__call_user_function(self, kwargs, version):
100 if 'user_function' not in kwargs or 'user_kwargs' not in kwargs:
101 return {'return_code': 400,
102 'reason': "Both the 'user_function' and 'user_kwargs' parameters are needed"}
104 try:
105 func = getattr(self.usr_module, kwargs['user_function'])
106 except AttributeError:
107 return {'return_code': 404, 'reason': 'The user function does not exist'}
109 try:
110 ret = func(**kwargs['user_kwargs'])
111 return {'return_code': 200, 'return_value': ret}
112 except Exception as e:
113 return {'return_code': 400, 'reason': str(e)}
116class Client:
117 _instance_cache = dict()
119 class UserFunctionCallError(Exception):
120 def __init__(self, return_code, reason):
121 self.return_code = return_code
122 self.reason = reason
124 def __str__(self):
125 return "UserFunctionCallError {}: {}".format(self.return_code, self.reason)
127 @classmethod
128 def get_or_create_instance(cls, script):
129 instance = cls._instance_cache.get(script)
130 if instance is None:
131 instance = cls._instance_cache[script] = cls(script)
132 return instance
134 @property
135 def interpreter(self):
136 """
137 Intuitively, 'sys.executable' would be used for this purpose, but when using
138 uwsgi, it clobbers the actual interpreter pointed to by sys.executable, and
139 replaces it with the path to the uwsgi executable...because that makes sense.
141 One workaround would be to use a shebang, but that won't work, as the env is wiped
142 when starting the server process. So the PATH variable won't reflect the updated
143 PATH when using a venv (either locally or in the Docker container).
145 `which python3` works, as it will return the path to the interpreter based on the
146 PATH variable. This works whether using a venv locally or in the docker container,
147 as it is executed separately in the current env.
149 NOTE: This is not 100% foolproof. If the user is not using the same interpreter
150 for execution as is pointed to in their PATH var, then this could call a different
151 interpreter. That would be bad practice, but it could happen.
153 If following the standard setup guide, or using Docker containers, this
154 works perfectly fine though. There could be room for improvement here, as uwsgi
155 maintainers are well aware of the 'sys.executable' issue and don't seem to care.
156 """
157 if hasattr(self, '_interpreter'):
158 return self._interpreter
160 interp = subprocess.check_output(["which", "python3"])
161 self._interpreter = interp.decode().strip()
162 return self._interpreter
164 def shutdown(self):
165 if hasattr(self, "sesh"):
166 self.sesh.kill()
168 try:
169 del self._instance_cache[self.usr_script]
170 except KeyError:
171 pass # Already garbage collected
173 def _restart_server(self):
174 self.shutdown()
176 self.sesh = subprocess.Popen([self.interpreter, __file__], stdin=PIPE, stdout=PIPE,
177 universal_newlines=False, env={})
178 self.iowrapper = IOWrapper(stream_in=self.sesh.stdout, stream_out=self.sesh.stdin)
180 ret = self.rpc_call(self.iowrapper.EXEC_USER_SCRIPT, {'script': self.usr_script}, retry=False)
181 if ret.get('return_code') != 200:
182 raise ValueError('Error {} - Cannot set the user script: {}'.format(ret.get('return_code'),
183 ret.get('reason')))
185 def __init__(self, usr_script):
186 self.usr_script = usr_script
188 self.lock = RLock()
189 self._restart_server()
191 def __del__(self):
192 self.shutdown()
194 def rpc_call(self, method, kwargs=None, version=1, retry=True):
195 if kwargs is None:
196 kwargs = {}
198 # Make sure we do not have multiple RPC calls at the same time
199 with self.lock:
200 # Try up to 3 times to make the call, and restart the server after each fail
201 for i in range(3):
202 try:
203 self.iowrapper.send(json.dumps({'method': method, 'kwargs': kwargs, 'version': version}))
204 return json.loads(self.iowrapper.read())
205 except Exception:
206 pass
208 # restart the server, since we got an unexpected output
209 if retry:
210 self._restart_server()
211 else:
212 break
214 raise IOError("Failed to make an RPC call: method='{}', kwargs={}. version={}".format(method,
215 kwargs,
216 version))
218 def call_user_function(self, func_name, kwargs):
219 ret = self.rpc_call(self.iowrapper.CALL_USER_FUNCTION, {"user_function": func_name, "user_kwargs": kwargs})
220 if ret.get('return_code') == 200:
221 return ret.get('return_value')
222 else:
223 raise Client.UserFunctionCallError(ret.get('return_code'), ret.get('reason'))
226# If the script is run directly, just start a server
227if __name__ == "__main__":
228 # This code is tested by test_sandbox.IntegrationTests, but coverage cannot access line coverage there
229 __io_s = Server(lockdown=True) # pragma: no cover
230 __io_s.serve_forever() # pragma: no cover