OILS / core / process_test.py View on Github | oilshell.org

274 lines, 176 significant
1#!/usr/bin/env python2
2"""process_test.py: Tests for process.py."""
3
4import os
5import unittest
6
7from _devbuild.gen.id_kind_asdl import Id
8from _devbuild.gen.runtime_asdl import (RedirValue, redirect_arg, cmd_value,
9 trace)
10from _devbuild.gen.syntax_asdl import loc, redir_loc
11from asdl import runtime
12from builtin import read_osh
13from builtin import trap_osh
14from core import dev
15from core import process # module under test
16from core import pyos
17from core import test_lib
18from display import ui
19from core import util
20from mycpp.mylib import log
21from core import state
22from mycpp import mylib
23
24import posix_ as posix
25
26Process = process.Process
27ExternalThunk = process.ExternalThunk
28
29
30def Banner(msg):
31 print('-' * 60)
32 print(msg)
33
34
35def _CommandNode(code_str, arena):
36 c_parser = test_lib.InitCommandParser(code_str, arena=arena)
37 return c_parser.ParseLogicalLine()
38
39
40class FakeJobControl(object):
41
42 def __init__(self, enabled):
43 self.enabled = enabled
44
45 def Enabled(self):
46 return self.enabled
47
48
49class ProcessTest(unittest.TestCase):
50
51 def setUp(self):
52 self.arena = test_lib.MakeArena('process_test.py')
53
54 mem = state.Mem('', [], self.arena, [])
55 parse_opts, exec_opts, mutable_opts = state.MakeOpts(mem, None)
56 mem.exec_opts = exec_opts
57
58 state.InitMem(mem, {}, '0.1')
59
60 self.job_control = process.JobControl()
61 self.job_list = process.JobList()
62
63 signal_safe = pyos.InitSignalSafe()
64 self.trap_state = trap_osh.TrapState(signal_safe)
65
66 fd_state = None
67 self.multi_trace = dev.MultiTracer(posix.getpid(), '', '', '',
68 fd_state)
69 self.tracer = dev.Tracer(None, exec_opts, mutable_opts, mem,
70 mylib.Stderr(), self.multi_trace)
71 self.waiter = process.Waiter(self.job_list, exec_opts, self.trap_state,
72 self.tracer)
73 errfmt = ui.ErrorFormatter()
74 self.fd_state = process.FdState(errfmt, self.job_control,
75 self.job_list, None, self.tracer, None,
76 exec_opts)
77 self.ext_prog = process.ExternalProgram('', self.fd_state, errfmt,
78 util.NullDebugFile())
79
80 def _ExtProc(self, argv):
81 arg_vec = cmd_value.Argv(argv, [loc.Missing] * len(argv), False, None,
82 None)
83 argv0_path = None
84 for path_entry in ['/bin', '/usr/bin']:
85 full_path = os.path.join(path_entry, argv[0])
86 if os.path.exists(full_path):
87 argv0_path = full_path
88 break
89 if not argv0_path:
90 argv0_path = argv[0] # fallback that tests failure case
91 thunk = ExternalThunk(self.ext_prog, argv0_path, arg_vec, {})
92 return Process(thunk, self.job_control, self.job_list, self.tracer)
93
94 def testStdinRedirect(self):
95 PATH = '_tmp/one-two.txt'
96 # Write two lines
97 with open(PATH, 'w') as f:
98 f.write('one\ntwo\n')
99
100 # Should get the first line twice, because Pop() closes it!
101
102 r = RedirValue(Id.Redir_Less, runtime.NO_SPID, redir_loc.Fd(0),
103 redirect_arg.Path(PATH))
104
105 class CommandEvaluator(object):
106
107 def RunPendingTraps(self):
108 pass
109
110 cmd_ev = CommandEvaluator()
111
112 err_out = []
113 self.fd_state.Push([r], err_out)
114 line1, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
115 self.fd_state.Pop(err_out)
116
117 self.fd_state.Push([r], err_out)
118 line2, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
119 self.fd_state.Pop(err_out)
120
121 # sys.stdin.readline() would erroneously return 'two' because of buffering.
122 self.assertEqual('one', line1)
123 self.assertEqual('one', line2)
124
125 def testProcess(self):
126 # 3 fds. Does Python open it? Shell seems to have it too. Maybe it
127 # inherits from the shell.
128 print('FDS BEFORE', os.listdir('/dev/fd'))
129
130 Banner('date')
131 argv = ['date']
132 p = self._ExtProc(argv)
133 why = trace.External(argv)
134 status = p.RunProcess(self.waiter, why)
135 log('date returned %d', status)
136 self.assertEqual(0, status)
137
138 Banner('does-not-exist')
139 p = self._ExtProc(['does-not-exist'])
140 print(p.RunProcess(self.waiter, why))
141
142 # 12 file descriptors open!
143 print('FDS AFTER', os.listdir('/dev/fd'))
144
145 def testPipeline(self):
146 node = _CommandNode('uniq -c', self.arena)
147 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
148 ext_prog=self.ext_prog)
149 print('BEFORE', os.listdir('/dev/fd'))
150
151 p = process.Pipeline(False, self.job_control, self.job_list,
152 self.tracer)
153 p.Add(self._ExtProc(['ls']))
154 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '2']))
155 p.Add(self._ExtProc(['sort']))
156
157 p.AddLast((cmd_ev, node))
158
159 p.StartPipeline(self.waiter)
160 pipe_status = p.RunLastPart(self.waiter, self.fd_state)
161 log('pipe_status: %s', pipe_status)
162
163 print('AFTER', os.listdir('/dev/fd'))
164
165 def testPipeline2(self):
166 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
167 ext_prog=self.ext_prog)
168
169 Banner('ls | cut -d . -f 1 | head')
170 p = process.Pipeline(False, self.job_control, self.job_list,
171 self.tracer)
172 p.Add(self._ExtProc(['ls']))
173 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '1']))
174
175 node = _CommandNode('head', self.arena)
176 p.AddLast((cmd_ev, node))
177
178 p.StartPipeline(self.waiter)
179 print(p.RunLastPart(self.waiter, self.fd_state))
180
181 # Simulating subshell for each command
182 node1 = _CommandNode('ls', self.arena)
183 node2 = _CommandNode('head', self.arena)
184 node3 = _CommandNode('sort --reverse', self.arena)
185
186 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
187 self.multi_trace, True, False)
188 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
189 self.multi_trace, True, False)
190 thunk3 = process.SubProgramThunk(cmd_ev, node3, self.trap_state,
191 self.multi_trace, True, False)
192
193 p = process.Pipeline(False, self.job_control, self.job_list,
194 self.tracer)
195 p.Add(Process(thunk1, self.job_control, self.job_list, self.tracer))
196 p.Add(Process(thunk2, self.job_control, self.job_list, self.tracer))
197 p.Add(Process(thunk3, self.job_control, self.job_list, self.tracer))
198
199 last_thunk = (cmd_ev, _CommandNode('cat', self.arena))
200 p.AddLast(last_thunk)
201
202 p.StartPipeline(self.waiter)
203 print(p.RunLastPart(self.waiter, self.fd_state))
204
205 # TODO: Combine pipelines for other things:
206
207 # echo foo 1>&2 | tee stdout.txt
208 #
209 # foo=$(ls | head)
210 #
211 # foo=$(<<EOF ls | head)
212 # stdin
213 # EOF
214 #
215 # ls | head &
216
217 # Or technically we could fork the whole interpreter for foo|bar|baz and
218 # capture stdout of that interpreter.
219
220 def makeTestPipeline(self, jc):
221 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
222 ext_prog=self.ext_prog)
223
224 pi = process.Pipeline(False, jc, self.job_list, self.tracer)
225
226 node1 = _CommandNode('/bin/echo testpipeline', self.arena)
227 node2 = _CommandNode('cat', self.arena)
228
229 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
230 self.multi_trace, True, False)
231 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
232 self.multi_trace, True, False)
233
234 pi.Add(Process(thunk1, jc, self.job_list, self.tracer))
235 pi.Add(Process(thunk2, jc, self.job_list, self.tracer))
236
237 return pi
238
239 def testPipelinePgidField(self):
240 jc = FakeJobControl(False)
241
242 pi = self.makeTestPipeline(jc)
243 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
244
245 pi.StartPipeline(self.waiter)
246 # No pgid
247 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
248
249 jc = FakeJobControl(True)
250
251 pi = self.makeTestPipeline(jc)
252 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
253
254 pi.StartPipeline(self.waiter)
255 # first process is the process group leader
256 self.assertEqual(pi.pids[0], pi.ProcessGroupId())
257
258 def testOpen(self):
259 # Disabled because mycpp translation can't handle it. We do this at a
260 # higher layer.
261 return
262
263 # This function used to raise BOTH OSError and IOError because Python 2 is
264 # inconsistent.
265 # We follow Python 3 in preferring OSError.
266 # https://stackoverflow.com/questions/29347790/difference-between-ioerror-and-oserror
267 self.assertRaises(OSError, self.fd_state.Open, '_nonexistent_')
268 self.assertRaises(OSError, self.fd_state.Open, 'metrics/')
269
270
271if __name__ == '__main__':
272 unittest.main()
273
274# vim: sw=4