OILS / core / process_test.py View on Github | oilshell.org

277 lines, 178 significant
1#!/usr/bin/env python2
2"""process_test.py: Tests for process.py."""
3
4import os
5import unittest
6
7from _devbuild.gen.id_kind_asdl import Id
8from _devbuild.gen.runtime_asdl import (RedirValue, redirect_arg, cmd_value,
9 trace)
10from _devbuild.gen.syntax_asdl import loc, redir_loc
11from asdl import runtime
12from builtin import read_osh
13from builtin import trap_osh
14from core import dev
15from core import process # module under test
16from core import pyos
17from core import sh_init
18from core import state
19from core import test_lib
20from core import util
21from display import ui
22from mycpp import iolib
23from mycpp import mylib
24from mycpp.mylib import log
25
26import posix_ as posix
27
28Process = process.Process
29ExternalThunk = process.ExternalThunk
30
31
32def Banner(msg):
33 print('-' * 60)
34 print(msg)
35
36
37def _CommandNode(code_str, arena):
38 c_parser = test_lib.InitCommandParser(code_str, arena=arena)
39 return c_parser.ParseLogicalLine()
40
41
42class FakeJobControl(object):
43
44 def __init__(self, enabled):
45 self.enabled = enabled
46
47 def Enabled(self):
48 return self.enabled
49
50
51class ProcessTest(unittest.TestCase):
52
53 def setUp(self):
54 self.arena = test_lib.MakeArena('process_test.py')
55
56 mem = state.Mem('', [], self.arena, [], {})
57 parse_opts, exec_opts, mutable_opts = state.MakeOpts(mem, {}, None)
58 mem.exec_opts = exec_opts
59
60 #state.InitMem(mem, {}, '0.1')
61 sh_init.InitDefaultVars(mem)
62
63 self.job_control = process.JobControl()
64 self.job_list = process.JobList()
65
66 signal_safe = iolib.InitSignalSafe()
67 self.trap_state = trap_osh.TrapState(signal_safe)
68
69 fd_state = None
70 self.multi_trace = dev.MultiTracer(posix.getpid(), '', '', '',
71 fd_state)
72 self.tracer = dev.Tracer(None, exec_opts, mutable_opts, mem,
73 mylib.Stderr(), self.multi_trace)
74 self.waiter = process.Waiter(self.job_list, exec_opts, self.trap_state,
75 self.tracer)
76 errfmt = ui.ErrorFormatter()
77 self.fd_state = process.FdState(errfmt, self.job_control,
78 self.job_list, None, self.tracer, None,
79 exec_opts)
80 self.ext_prog = process.ExternalProgram('', self.fd_state, errfmt,
81 util.NullDebugFile())
82
83 def _ExtProc(self, argv):
84 arg_vec = cmd_value.Argv(argv, [loc.Missing] * len(argv), False, None,
85 None)
86 argv0_path = None
87 for path_entry in ['/bin', '/usr/bin']:
88 full_path = os.path.join(path_entry, argv[0])
89 if os.path.exists(full_path):
90 argv0_path = full_path
91 break
92 if not argv0_path:
93 argv0_path = argv[0] # fallback that tests failure case
94 thunk = ExternalThunk(self.ext_prog, argv0_path, arg_vec, {})
95 return Process(thunk, self.job_control, self.job_list, self.tracer)
96
97 def testStdinRedirect(self):
98 PATH = '_tmp/one-two.txt'
99 # Write two lines
100 with open(PATH, 'w') as f:
101 f.write('one\ntwo\n')
102
103 # Should get the first line twice, because Pop() closes it!
104
105 r = RedirValue(Id.Redir_Less, runtime.NO_SPID, redir_loc.Fd(0),
106 redirect_arg.Path(PATH))
107
108 class CommandEvaluator(object):
109
110 def RunPendingTraps(self):
111 pass
112
113 cmd_ev = CommandEvaluator()
114
115 err_out = []
116 self.fd_state.Push([r], err_out)
117 line1, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
118 self.fd_state.Pop(err_out)
119
120 self.fd_state.Push([r], err_out)
121 line2, _ = read_osh._ReadPortion(pyos.NEWLINE_CH, -1, cmd_ev)
122 self.fd_state.Pop(err_out)
123
124 # sys.stdin.readline() would erroneously return 'two' because of buffering.
125 self.assertEqual('one', line1)
126 self.assertEqual('one', line2)
127
128 def testProcess(self):
129 # 3 fds. Does Python open it? Shell seems to have it too. Maybe it
130 # inherits from the shell.
131 print('FDS BEFORE', os.listdir('/dev/fd'))
132
133 Banner('date')
134 argv = ['date']
135 p = self._ExtProc(argv)
136 why = trace.External(argv)
137 status = p.RunProcess(self.waiter, why)
138 log('date returned %d', status)
139 self.assertEqual(0, status)
140
141 Banner('does-not-exist')
142 p = self._ExtProc(['does-not-exist'])
143 print(p.RunProcess(self.waiter, why))
144
145 # 12 file descriptors open!
146 print('FDS AFTER', os.listdir('/dev/fd'))
147
148 def testPipeline(self):
149 node = _CommandNode('uniq -c', self.arena)
150 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
151 ext_prog=self.ext_prog)
152 print('BEFORE', os.listdir('/dev/fd'))
153
154 p = process.Pipeline(False, self.job_control, self.job_list,
155 self.tracer)
156 p.Add(self._ExtProc(['ls']))
157 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '2']))
158 p.Add(self._ExtProc(['sort']))
159
160 p.AddLast((cmd_ev, node))
161
162 p.StartPipeline(self.waiter)
163 pipe_status = p.RunLastPart(self.waiter, self.fd_state)
164 log('pipe_status: %s', pipe_status)
165
166 print('AFTER', os.listdir('/dev/fd'))
167
168 def testPipeline2(self):
169 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
170 ext_prog=self.ext_prog)
171
172 Banner('ls | cut -d . -f 1 | head')
173 p = process.Pipeline(False, self.job_control, self.job_list,
174 self.tracer)
175 p.Add(self._ExtProc(['ls']))
176 p.Add(self._ExtProc(['cut', '-d', '.', '-f', '1']))
177
178 node = _CommandNode('head', self.arena)
179 p.AddLast((cmd_ev, node))
180
181 p.StartPipeline(self.waiter)
182 print(p.RunLastPart(self.waiter, self.fd_state))
183
184 # Simulating subshell for each command
185 node1 = _CommandNode('ls', self.arena)
186 node2 = _CommandNode('head', self.arena)
187 node3 = _CommandNode('sort --reverse', self.arena)
188
189 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
190 self.multi_trace, True, False)
191 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
192 self.multi_trace, True, False)
193 thunk3 = process.SubProgramThunk(cmd_ev, node3, self.trap_state,
194 self.multi_trace, True, False)
195
196 p = process.Pipeline(False, self.job_control, self.job_list,
197 self.tracer)
198 p.Add(Process(thunk1, self.job_control, self.job_list, self.tracer))
199 p.Add(Process(thunk2, self.job_control, self.job_list, self.tracer))
200 p.Add(Process(thunk3, self.job_control, self.job_list, self.tracer))
201
202 last_thunk = (cmd_ev, _CommandNode('cat', self.arena))
203 p.AddLast(last_thunk)
204
205 p.StartPipeline(self.waiter)
206 print(p.RunLastPart(self.waiter, self.fd_state))
207
208 # TODO: Combine pipelines for other things:
209
210 # echo foo 1>&2 | tee stdout.txt
211 #
212 # foo=$(ls | head)
213 #
214 # foo=$(<<EOF ls | head)
215 # stdin
216 # EOF
217 #
218 # ls | head &
219
220 # Or technically we could fork the whole interpreter for foo|bar|baz and
221 # capture stdout of that interpreter.
222
223 def makeTestPipeline(self, jc):
224 cmd_ev = test_lib.InitCommandEvaluator(arena=self.arena,
225 ext_prog=self.ext_prog)
226
227 pi = process.Pipeline(False, jc, self.job_list, self.tracer)
228
229 node1 = _CommandNode('/bin/echo testpipeline', self.arena)
230 node2 = _CommandNode('cat', self.arena)
231
232 thunk1 = process.SubProgramThunk(cmd_ev, node1, self.trap_state,
233 self.multi_trace, True, False)
234 thunk2 = process.SubProgramThunk(cmd_ev, node2, self.trap_state,
235 self.multi_trace, True, False)
236
237 pi.Add(Process(thunk1, jc, self.job_list, self.tracer))
238 pi.Add(Process(thunk2, jc, self.job_list, self.tracer))
239
240 return pi
241
242 def testPipelinePgidField(self):
243 jc = FakeJobControl(False)
244
245 pi = self.makeTestPipeline(jc)
246 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
247
248 pi.StartPipeline(self.waiter)
249 # No pgid
250 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
251
252 jc = FakeJobControl(True)
253
254 pi = self.makeTestPipeline(jc)
255 self.assertEqual(process.INVALID_PGID, pi.ProcessGroupId())
256
257 pi.StartPipeline(self.waiter)
258 # first process is the process group leader
259 self.assertEqual(pi.pids[0], pi.ProcessGroupId())
260
261 def testOpen(self):
262 # Disabled because mycpp translation can't handle it. We do this at a
263 # higher layer.
264 return
265
266 # This function used to raise BOTH OSError and IOError because Python 2 is
267 # inconsistent.
268 # We follow Python 3 in preferring OSError.
269 # https://stackoverflow.com/questions/29347790/difference-between-ioerror-and-oserror
270 self.assertRaises(OSError, self.fd_state.Open, '_nonexistent_')
271 self.assertRaises(OSError, self.fd_state.Open, 'metrics/')
272
273
274if __name__ == '__main__':
275 unittest.main()
276
277# vim: sw=4