OILS / builtin / pure_ysh.py View on Github | oilshell.org

228 lines, 143 significant
1"""
2builtin/pure_ysh.py - YSH builtins that don't do I/O.
3"""
4from __future__ import print_function
5
6from _devbuild.gen.runtime_asdl import cmd_value
7from _devbuild.gen.syntax_asdl import command_t, loc, loc_t
8from _devbuild.gen.value_asdl import value, value_e, value_t
9from core import error
10from core import state
11from core import vm
12from frontend import flag_util
13from frontend import typed_args
14from mycpp import mylib
15from mycpp.mylib import tagswitch, NewDict
16
17from typing import TYPE_CHECKING, cast, Any, Dict, List
18
19if TYPE_CHECKING:
20 from core import executor
21 from display import ui
22 from osh.cmd_eval import CommandEvaluator
23
24
25class Shvar(vm._Builtin):
26
27 def __init__(self, mem, search_path, cmd_ev):
28 # type: (state.Mem, executor.SearchPath, CommandEvaluator) -> None
29 self.mem = mem
30 self.search_path = search_path # to clear PATH
31 self.cmd_ev = cmd_ev # To run blocks
32
33 def Run(self, cmd_val):
34 # type: (cmd_value.Argv) -> int
35 _, arg_r = flag_util.ParseCmdVal('shvar',
36 cmd_val,
37 accept_typed_args=True)
38
39 # TODO: I think shvar LANG=C should just mutate
40 # But should there be a whitelist?
41 cmd_frag = typed_args.RequiredBlockAsFrag(cmd_val)
42
43 vars = NewDict() # type: Dict[str, value_t]
44 args, arg_locs = arg_r.Rest2()
45 if len(args) == 0:
46 raise error.Usage('Expected name=value', loc.Missing)
47
48 for i, arg in enumerate(args):
49 name, s = mylib.split_once(arg, '=')
50 if s is None:
51 raise error.Usage('Expected name=value', arg_locs[i])
52 v = value.Str(s) # type: value_t
53 vars[name] = v
54
55 # Important fix: shvar PATH='' { } must make all binaries invisible
56 if name == 'PATH':
57 self.search_path.ClearCache()
58
59 with state.ctx_Eval(self.mem, None, None, vars):
60 unused = self.cmd_ev.EvalCommandFrag(cmd_frag)
61
62 return 0
63
64
65class ctx_Context(object):
66 """For ctx push (context) { ... }"""
67
68 def __init__(self, mem, context):
69 # type: (state.Mem, Dict[str, value_t]) -> None
70 self.mem = mem
71 self.mem.PushContextStack(context)
72
73 def __enter__(self):
74 # type: () -> None
75 pass
76
77 def __exit__(self, type, value, traceback):
78 # type: (Any, Any, Any) -> None
79 self.mem.PopContextStack()
80
81
82class Ctx(vm._Builtin):
83
84 def __init__(self, mem, cmd_ev):
85 # type: (state.Mem, CommandEvaluator) -> None
86 self.mem = mem
87 self.cmd_ev = cmd_ev # To run blocks
88
89 def _GetContext(self):
90 # type: () -> Dict[str, value_t]
91 ctx = self.mem.GetContext()
92 if ctx is None:
93 raise error.Expr(
94 "Could not find a context. Did you forget to 'ctx push'?",
95 loc.Missing)
96 return ctx
97
98 def _Push(self, context, block):
99 # type: (Dict[str, value_t], command_t) -> int
100 with ctx_Context(self.mem, context):
101 return self.cmd_ev.EvalCommandFrag(block)
102
103 def _Set(self, updates):
104 # type: (Dict[str, value_t]) -> int
105 ctx = self._GetContext()
106 ctx.update(updates)
107 return 0
108
109 def _Emit(self, field, item, blame):
110 # type: (str, value_t, loc_t) -> int
111 ctx = self._GetContext()
112
113 if field not in ctx:
114 ctx[field] = value.List([])
115
116 UP_arr = ctx[field]
117 if UP_arr.tag() != value_e.List:
118 raise error.TypeErr(
119 UP_arr,
120 "Expected the context item '%s' to be a List" % (field), blame)
121
122 arr = cast(value.List, UP_arr)
123 arr.items.append(item)
124
125 return 0
126
127 def Run(self, cmd_val):
128 # type: (cmd_value.Argv) -> int
129 rd = typed_args.ReaderForProc(cmd_val)
130 _, arg_r = flag_util.ParseCmdVal('ctx',
131 cmd_val,
132 accept_typed_args=True)
133
134 verb, verb_loc = arg_r.ReadRequired2(
135 'Expected a verb (push, set, emit)')
136
137 if verb == "push":
138 context = rd.PosDict()
139 block = rd.RequiredBlockAsFrag()
140 rd.Done()
141 arg_r.AtEnd()
142
143 return self._Push(context, block)
144
145 elif verb == "set":
146 updates = rd.RestNamed()
147 rd.Done()
148 arg_r.AtEnd()
149
150 return self._Set(updates)
151
152 elif verb == "emit":
153 field, field_loc = arg_r.ReadRequired2(
154 "A target field is required")
155 item = rd.PosValue()
156 rd.Done()
157 arg_r.AtEnd()
158
159 return self._Emit(field, item, field_loc)
160
161 else:
162 raise error.Usage("Unknown verb '%s'" % verb, verb_loc)
163
164
165class PushRegisters(vm._Builtin):
166
167 def __init__(self, mem, cmd_ev):
168 # type: (state.Mem, CommandEvaluator) -> None
169 self.mem = mem
170 self.cmd_ev = cmd_ev # To run blocks
171
172 def Run(self, cmd_val):
173 # type: (cmd_value.Argv) -> int
174 _, arg_r = flag_util.ParseCmdVal('push-registers',
175 cmd_val,
176 accept_typed_args=True)
177
178 cmd_frag = typed_args.RequiredBlockAsFrag(cmd_val)
179
180 with state.ctx_Registers(self.mem):
181 unused = self.cmd_ev.EvalCommandFrag(cmd_frag)
182
183 # make it "SILENT" in terms of not mutating $?
184 # TODO: Revisit this. It might be better to provide the headless shell
185 # with a way to SET $? instead. Needs to be tested/prototyped.
186 return self.mem.last_status[-1]
187
188
189class Append(vm._Builtin):
190 """Push word args onto an List.
191
192 Not doing typed args since you can do
193
194 :: mylist->append(42)
195 """
196
197 def __init__(self, mem, errfmt):
198 # type: (state.Mem, ui.ErrorFormatter) -> None
199 self.mem = mem
200 self.errfmt = errfmt
201
202 def Run(self, cmd_val):
203 # type: (cmd_value.Argv) -> int
204
205 # This means we ignore -- , which is consistent
206 _, arg_r = flag_util.ParseCmdVal('append',
207 cmd_val,
208 accept_typed_args=True)
209
210 rd = typed_args.ReaderForProc(cmd_val)
211 val = rd.PosValue()
212 rd.Done()
213
214 UP_val = val
215 with tagswitch(val) as case:
216 if case(value_e.BashArray):
217 val = cast(value.BashArray, UP_val)
218 val.strs.extend(arg_r.Rest())
219 elif case(value_e.List):
220 val = cast(value.List, UP_val)
221 typed = [value.Str(s)
222 for s in arg_r.Rest()] # type: List[value_t]
223 val.items.extend(typed)
224 else:
225 raise error.TypeErr(val, 'expected List or BashArray',
226 loc.Missing)
227
228 return 0