OILS / core / process_test.py View on Github | oilshell.org

275 lines, 176 significant
1#!/usr/bin/env python2
2"""process_test.py: Tests for process.py."""
3
4import os
5import unittest
6
7from _devbuild.gen.id_kind_asdl import Id
8from _devbuild.gen.runtime_asdl import (RedirValue, redirect_arg, cmd_value,
9 trace)
10from _devbuild.gen.syntax_asdl import loc, redir_loc
11from asdl import runtime
12from builtin import read_osh
13from builtin import trap_osh
14from core import dev
15from core import process # module under test
16from core import pyos
17from core import test_lib
18from display import ui
19from core import util
20from mycpp.mylib import log
21from core import state
22from mycpp import mylib
23
24import posix_ as posix
25
26Process = process.Process
27ExternalThunk = process.ExternalThunk
28
29
30def Banner(msg):
31 print('-' * 60)
32 print(msg)
33
34
35def _CommandNode(code_str, arena):
36 c_parser = test_lib.InitCommandParser(code_str, arena=arena)
37 return c_parser.ParseLogicalLine()
38
39
40class FakeJobControl(object):
41
42 def __init__(self, enabled):
43 self.enabled = enabled
44
45 def Enabled(self):
46 return self.enabled
47
48
49class ProcessTest(unittest.TestCase):
50
51 def setUp(self):
52 self.arena = test_lib.MakeArena('process_test.py')
53
54 mem = state.Mem('', [], self.arena, [])
55 parse_opts, exec_opts, mutable_opts = state.MakeOpts(mem, None)
56 mem.exec_opts = exec_opts
57
58 #state.InitMem(mem, {}, '0.1')
59 state.InitDefaultVars(mem)
60
61 self.job_control = process.JobControl()
62 self.job_list = process.JobList()
63
64 signal_safe = pyos.InitSignalSafe()
65 self.trap_state = trap_osh.TrapState(signal_safe)
66
67 fd_state = None
68 self.multi_trace = dev.MultiTracer(posix.getpid(), '', '', '',
69 fd_state)
70 self.tracer = dev.Tracer(None, exec_opts, mutable_opts, mem,
71 mylib.Stderr(), self.multi_trace)
72 self.waiter = process.Waiter(self.job_list, exec_opts, self.trap_state,
73 self.tracer)
74 errfmt = ui.ErrorFormatter()
75 self.fd_state = process.FdState(errfmt, self.job_control,
76 self.job_list, None, self.tracer, None,
77 exec_opts)
78 self.ext_prog = process.ExternalProgram('', self.fd_state, errfmt,
79 util.NullDebugFile())
80
81 def _ExtProc(self, argv):
82 arg_vec = cmd_value.Argv(argv, [loc.Missing] * len(argv), False, None,
83 None)
84 argv0_path = None
85 for path_entry in ['/bin', '/usr/bin']:
86 full_path = os.path.join(path_entry, argv[0])
87 if os.path.exists(full_path):
88 argv0_path = full_path
89 break
90 if not argv0_path:
91 argv0_path = argv[0] # fallback that tests failure case
92 thunk = ExternalThunk(self.ext_prog, argv0_path, arg_vec, {})
93 return Process(thunk, self.job_control, self.job_list, self.tracer)
94
95 def testStdinRedirect(self):
96 PATH = '_tmp/one-two.txt'
97 # Write two lines
98 with open(PATH, 'w') as f:
99 f.write('one\ntwo\n')
100
101 # Should get the first line twice, because Pop() closes it!
102
103 r = RedirValue(Id.Redir_Less, runtime.NO_SPID, redir_loc.Fd(0),
104 redirect_arg.Path(PATH))
105
106 class CommandEvaluator(object):
107
108 def RunPendingTraps(self):
109 pass
110
111 cmd_ev = CommandEvaluator()
112
113 err_out = []
114 self.fd_state.Push([r], err_out)
115 line1, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
116 self.fd_state.Pop(err_out)
117
118 self.fd_state.Push([r], err_out)
119 line2, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
120 self.fd_state.Pop(err_out)
121
122 # sys.stdin.readline() would erroneously return 'two' because of buffering.
123 self.assertEqual('one', line1)
124 self.assertEqual('one', line2)
125
126 def testProcess(self):
127 # 3 fds. Does Python open it? Shell seems to have it too. Maybe it
128 # inherits from the shell.
129 print('FDS BEFORE', os.listdir('/dev/fd'))
130
131 Banner('date')
132 argv = ['date']
133 p = self._ExtProc(argv)
134 why = trace.External(argv)
135 status = p.RunProcess(self.waiter, why)
136 log('date returned %d', status)
137 self.assertEqual(0, status)
138
139 Banner('does-not-exist')
140 p = self._ExtProc(['does-not-exist'])
141 print(p.RunProcess(self.waiter, why))
142
143 # 12 file descriptors open!
144 print('FDS AFTER', os.listdir('/dev/fd'))
145
146 def testPipeline(self):
147 node = _CommandNode('uniq -c', self.arena)
148 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
149 ext_prog=self.ext_prog)
150 print('BEFORE', os.listdir('/dev/fd'))
151
152 p = process.Pipeline(False, self.job_control, self.job_list,
153 self.tracer)
154 p.Add(self._ExtProc(['ls']))
155 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '2']))
156 p.Add(self._ExtProc(['sort']))
157
158 p.AddLast((cmd_ev, node))
159
160 p.StartPipeline(self.waiter)
161 pipe_status = p.RunLastPart(self.waiter, self.fd_state)
162 log('pipe_status: %s', pipe_status)
163
164 print('AFTER', os.listdir('/dev/fd'))
165
166 def testPipeline2(self):
167 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
168 ext_prog=self.ext_prog)
169
170 Banner('ls | cut -d . -f 1 | head')
171 p = process.Pipeline(False, self.job_control, self.job_list,
172 self.tracer)
173 p.Add(self._ExtProc(['ls']))
174 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '1']))
175
176 node = _CommandNode('head', self.arena)
177 p.AddLast((cmd_ev, node))
178
179 p.StartPipeline(self.waiter)
180 print(p.RunLastPart(self.waiter, self.fd_state))
181
182 # Simulating subshell for each command
183 node1 = _CommandNode('ls', self.arena)
184 node2 = _CommandNode('head', self.arena)
185 node3 = _CommandNode('sort --reverse', self.arena)
186
187 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
188 self.multi_trace, True, False)
189 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
190 self.multi_trace, True, False)
191 thunk3 = process.SubProgramThunk(cmd_ev, node3, self.trap_state,
192 self.multi_trace, True, False)
193
194 p = process.Pipeline(False, self.job_control, self.job_list,
195 self.tracer)
196 p.Add(Process(thunk1, self.job_control, self.job_list, self.tracer))
197 p.Add(Process(thunk2, self.job_control, self.job_list, self.tracer))
198 p.Add(Process(thunk3, self.job_control, self.job_list, self.tracer))
199
200 last_thunk = (cmd_ev, _CommandNode('cat', self.arena))
201 p.AddLast(last_thunk)
202
203 p.StartPipeline(self.waiter)
204 print(p.RunLastPart(self.waiter, self.fd_state))
205
206 # TODO: Combine pipelines for other things:
207
208 # echo foo 1>&2 | tee stdout.txt
209 #
210 # foo=$(ls | head)
211 #
212 # foo=$(<<EOF ls | head)
213 # stdin
214 # EOF
215 #
216 # ls | head &
217
218 # Or technically we could fork the whole interpreter for foo|bar|baz and
219 # capture stdout of that interpreter.
220
221 def makeTestPipeline(self, jc):
222 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
223 ext_prog=self.ext_prog)
224
225 pi = process.Pipeline(False, jc, self.job_list, self.tracer)
226
227 node1 = _CommandNode('/bin/echo testpipeline', self.arena)
228 node2 = _CommandNode('cat', self.arena)
229
230 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
231 self.multi_trace, True, False)
232 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
233 self.multi_trace, True, False)
234
235 pi.Add(Process(thunk1, jc, self.job_list, self.tracer))
236 pi.Add(Process(thunk2, jc, self.job_list, self.tracer))
237
238 return pi
239
240 def testPipelinePgidField(self):
241 jc = FakeJobControl(False)
242
243 pi = self.makeTestPipeline(jc)
244 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
245
246 pi.StartPipeline(self.waiter)
247 # No pgid
248 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
249
250 jc = FakeJobControl(True)
251
252 pi = self.makeTestPipeline(jc)
253 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
254
255 pi.StartPipeline(self.waiter)
256 # first process is the process group leader
257 self.assertEqual(pi.pids[0], pi.ProcessGroupId())
258
259 def testOpen(self):
260 # Disabled because mycpp translation can't handle it. We do this at a
261 # higher layer.
262 return
263
264 # This function used to raise BOTH OSError and IOError because Python 2 is
265 # inconsistent.
266 # We follow Python 3 in preferring OSError.
267 # https://stackoverflow.com/questions/29347790/difference-between-ioerror-and-oserror
268 self.assertRaises(OSError, self.fd_state.Open, '_nonexistent_')
269 self.assertRaises(OSError, self.fd_state.Open, 'metrics/')
270
271
272if __name__ == '__main__':
273 unittest.main()
274
275# vim: sw=4