OILS / core / process_test.py View on Github | oilshell.org

276 lines, 177 significant
1#!/usr/bin/env python2
2"""process_test.py: Tests for process.py."""
3
4import os
5import unittest
6
7from _devbuild.gen.id_kind_asdl import Id
8from _devbuild.gen.runtime_asdl import (RedirValue, redirect_arg, cmd_value,
9 trace)
10from _devbuild.gen.syntax_asdl import loc, redir_loc
11from asdl import runtime
12from builtin import read_osh
13from builtin import trap_osh
14from core import dev
15from core import process # module under test
16from core import pyos
17from core import state
18from core import test_lib
19from core import util
20from display import ui
21from mycpp import iolib
22from mycpp import mylib
23from mycpp.mylib import log
24
25import posix_ as posix
26
27Process = process.Process
28ExternalThunk = process.ExternalThunk
29
30
31def Banner(msg):
32 print('-' * 60)
33 print(msg)
34
35
36def _CommandNode(code_str, arena):
37 c_parser = test_lib.InitCommandParser(code_str, arena=arena)
38 return c_parser.ParseLogicalLine()
39
40
41class FakeJobControl(object):
42
43 def __init__(self, enabled):
44 self.enabled = enabled
45
46 def Enabled(self):
47 return self.enabled
48
49
50class ProcessTest(unittest.TestCase):
51
52 def setUp(self):
53 self.arena = test_lib.MakeArena('process_test.py')
54
55 mem = state.Mem('', [], self.arena, [])
56 parse_opts, exec_opts, mutable_opts = state.MakeOpts(mem, None)
57 mem.exec_opts = exec_opts
58
59 #state.InitMem(mem, {}, '0.1')
60 state.InitDefaultVars(mem)
61
62 self.job_control = process.JobControl()
63 self.job_list = process.JobList()
64
65 signal_safe = iolib.InitSignalSafe()
66 self.trap_state = trap_osh.TrapState(signal_safe)
67
68 fd_state = None
69 self.multi_trace = dev.MultiTracer(posix.getpid(), '', '', '',
70 fd_state)
71 self.tracer = dev.Tracer(None, exec_opts, mutable_opts, mem,
72 mylib.Stderr(), self.multi_trace)
73 self.waiter = process.Waiter(self.job_list, exec_opts, self.trap_state,
74 self.tracer)
75 errfmt = ui.ErrorFormatter()
76 self.fd_state = process.FdState(errfmt, self.job_control,
77 self.job_list, None, self.tracer, None,
78 exec_opts)
79 self.ext_prog = process.ExternalProgram('', self.fd_state, errfmt,
80 util.NullDebugFile())
81
82 def _ExtProc(self, argv):
83 arg_vec = cmd_value.Argv(argv, [loc.Missing] * len(argv), False, None,
84 None)
85 argv0_path = None
86 for path_entry in ['/bin', '/usr/bin']:
87 full_path = os.path.join(path_entry, argv[0])
88 if os.path.exists(full_path):
89 argv0_path = full_path
90 break
91 if not argv0_path:
92 argv0_path = argv[0] # fallback that tests failure case
93 thunk = ExternalThunk(self.ext_prog, argv0_path, arg_vec, {})
94 return Process(thunk, self.job_control, self.job_list, self.tracer)
95
96 def testStdinRedirect(self):
97 PATH = '_tmp/one-two.txt'
98 # Write two lines
99 with open(PATH, 'w') as f:
100 f.write('one\ntwo\n')
101
102 # Should get the first line twice, because Pop() closes it!
103
104 r = RedirValue(Id.Redir_Less, runtime.NO_SPID, redir_loc.Fd(0),
105 redirect_arg.Path(PATH))
106
107 class CommandEvaluator(object):
108
109 def RunPendingTraps(self):
110 pass
111
112 cmd_ev = CommandEvaluator()
113
114 err_out = []
115 self.fd_state.Push([r], err_out)
116 line1, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
117 self.fd_state.Pop(err_out)
118
119 self.fd_state.Push([r], err_out)
120 line2, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
121 self.fd_state.Pop(err_out)
122
123 # sys.stdin.readline() would erroneously return 'two' because of buffering.
124 self.assertEqual('one', line1)
125 self.assertEqual('one', line2)
126
127 def testProcess(self):
128 # 3 fds. Does Python open it? Shell seems to have it too. Maybe it
129 # inherits from the shell.
130 print('FDS BEFORE', os.listdir('/dev/fd'))
131
132 Banner('date')
133 argv = ['date']
134 p = self._ExtProc(argv)
135 why = trace.External(argv)
136 status = p.RunProcess(self.waiter, why)
137 log('date returned %d', status)
138 self.assertEqual(0, status)
139
140 Banner('does-not-exist')
141 p = self._ExtProc(['does-not-exist'])
142 print(p.RunProcess(self.waiter, why))
143
144 # 12 file descriptors open!
145 print('FDS AFTER', os.listdir('/dev/fd'))
146
147 def testPipeline(self):
148 node = _CommandNode('uniq -c', self.arena)
149 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
150 ext_prog=self.ext_prog)
151 print('BEFORE', os.listdir('/dev/fd'))
152
153 p = process.Pipeline(False, self.job_control, self.job_list,
154 self.tracer)
155 p.Add(self._ExtProc(['ls']))
156 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '2']))
157 p.Add(self._ExtProc(['sort']))
158
159 p.AddLast((cmd_ev, node))
160
161 p.StartPipeline(self.waiter)
162 pipe_status = p.RunLastPart(self.waiter, self.fd_state)
163 log('pipe_status: %s', pipe_status)
164
165 print('AFTER', os.listdir('/dev/fd'))
166
167 def testPipeline2(self):
168 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
169 ext_prog=self.ext_prog)
170
171 Banner('ls | cut -d . -f 1 | head')
172 p = process.Pipeline(False, self.job_control, self.job_list,
173 self.tracer)
174 p.Add(self._ExtProc(['ls']))
175 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '1']))
176
177 node = _CommandNode('head', self.arena)
178 p.AddLast((cmd_ev, node))
179
180 p.StartPipeline(self.waiter)
181 print(p.RunLastPart(self.waiter, self.fd_state))
182
183 # Simulating subshell for each command
184 node1 = _CommandNode('ls', self.arena)
185 node2 = _CommandNode('head', self.arena)
186 node3 = _CommandNode('sort --reverse', self.arena)
187
188 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
189 self.multi_trace, True, False)
190 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
191 self.multi_trace, True, False)
192 thunk3 = process.SubProgramThunk(cmd_ev, node3, self.trap_state,
193 self.multi_trace, True, False)
194
195 p = process.Pipeline(False, self.job_control, self.job_list,
196 self.tracer)
197 p.Add(Process(thunk1, self.job_control, self.job_list, self.tracer))
198 p.Add(Process(thunk2, self.job_control, self.job_list, self.tracer))
199 p.Add(Process(thunk3, self.job_control, self.job_list, self.tracer))
200
201 last_thunk = (cmd_ev, _CommandNode('cat', self.arena))
202 p.AddLast(last_thunk)
203
204 p.StartPipeline(self.waiter)
205 print(p.RunLastPart(self.waiter, self.fd_state))
206
207 # TODO: Combine pipelines for other things:
208
209 # echo foo 1>&2 | tee stdout.txt
210 #
211 # foo=$(ls | head)
212 #
213 # foo=$(<<EOF ls | head)
214 # stdin
215 # EOF
216 #
217 # ls | head &
218
219 # Or technically we could fork the whole interpreter for foo|bar|baz and
220 # capture stdout of that interpreter.
221
222 def makeTestPipeline(self, jc):
223 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
224 ext_prog=self.ext_prog)
225
226 pi = process.Pipeline(False, jc, self.job_list, self.tracer)
227
228 node1 = _CommandNode('/bin/echo testpipeline', self.arena)
229 node2 = _CommandNode('cat', self.arena)
230
231 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
232 self.multi_trace, True, False)
233 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
234 self.multi_trace, True, False)
235
236 pi.Add(Process(thunk1, jc, self.job_list, self.tracer))
237 pi.Add(Process(thunk2, jc, self.job_list, self.tracer))
238
239 return pi
240
241 def testPipelinePgidField(self):
242 jc = FakeJobControl(False)
243
244 pi = self.makeTestPipeline(jc)
245 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
246
247 pi.StartPipeline(self.waiter)
248 # No pgid
249 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
250
251 jc = FakeJobControl(True)
252
253 pi = self.makeTestPipeline(jc)
254 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
255
256 pi.StartPipeline(self.waiter)
257 # first process is the process group leader
258 self.assertEqual(pi.pids[0], pi.ProcessGroupId())
259
260 def testOpen(self):
261 # Disabled because mycpp translation can't handle it. We do this at a
262 # higher layer.
263 return
264
265 # This function used to raise BOTH OSError and IOError because Python 2 is
266 # inconsistent.
267 # We follow Python 3 in preferring OSError.
268 # https://stackoverflow.com/questions/29347790/difference-between-ioerror-and-oserror
269 self.assertRaises(OSError, self.fd_state.Open, '_nonexistent_')
270 self.assertRaises(OSError, self.fd_state.Open, 'metrics/')
271
272
273if __name__ == '__main__':
274 unittest.main()
275
276# vim: sw=4