|
1 | 1 | import os |
2 | | -import subprocess |
3 | 2 | import synapse.tests.utils as s_test |
4 | 3 |
|
| 4 | +from prompt_toolkit.document import Document |
| 5 | +from prompt_toolkit.completion import Completion, CompleteEvent |
| 6 | + |
5 | 7 | import synapse.exc as s_exc |
6 | 8 | import synapse.common as s_common |
7 | 9 | import synapse.lib.output as s_output |
@@ -206,3 +208,117 @@ async def test_tools_storm_view(self): |
206 | 208 | outp = s_output.OutPutStr() |
207 | 209 | await s_t_storm.main(('--optsfile', optsfile, url, 'file:bytes'), outp=outp) |
208 | 210 | self.isin('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa', str(outp)) |
| 211 | + |
| 212 | + async def test_storm_tab_completion(self): |
| 213 | + class DummyStorm: |
| 214 | + def __init__(self, core): |
| 215 | + self.item = core |
| 216 | + |
| 217 | + async with self.getTestCore() as core: |
| 218 | + cli = DummyStorm(core) |
| 219 | + |
| 220 | + completer = s_t_storm.StormCompleter(cli) |
| 221 | + |
| 222 | + async def get_completions(text): |
| 223 | + document = Document(text) |
| 224 | + event = CompleteEvent(completion_requested=True) |
| 225 | + return await s_test.alist(completer.get_completions_async(document, event)) |
| 226 | + |
| 227 | + vals = await get_completions('') |
| 228 | + self.len(0, vals) |
| 229 | + |
| 230 | + # Check completion of forms/props |
| 231 | + vals = await get_completions('inet:fq') |
| 232 | + self.isin(Completion('dn', display='[form] inet:fqdn - A Fully Qualified Domain Name (FQDN).'), vals) |
| 233 | + self.isin(Completion('dn.seen', display='[prop] inet:fqdn.seen - The time interval for first/last observation of the node.'), vals) |
| 234 | + self.isin(Completion('dn.created', display='[prop] inet:fqdn.created - The time the node was created in the cortex.'), vals) |
| 235 | + self.isin(Completion('dn:domain', display='[prop] inet:fqdn:domain - The parent domain for the FQDN.'), vals) |
| 236 | + self.isin(Completion('dn:host', display='[prop] inet:fqdn:host - The host part of the FQDN.'), vals) |
| 237 | + self.isin(Completion('dn:issuffix', display='[prop] inet:fqdn:issuffix - True if the FQDN is considered a suffix.'), vals) |
| 238 | + self.isin(Completion('dn:iszone', display='[prop] inet:fqdn:iszone - True if the FQDN is considered a zone.'), vals) |
| 239 | + self.isin(Completion('dn:zone', display='[prop] inet:fqdn:zone - The zone level parent for this FQDN.'), vals) |
| 240 | + |
| 241 | + vals = await get_completions('inet:fqdn.') |
| 242 | + self.isin(Completion('seen', display='[prop] inet:fqdn.seen - The time interval for first/last observation of the node.'), vals) |
| 243 | + self.isin(Completion('created', display='[prop] inet:fqdn.created - The time the node was created in the cortex.'), vals) |
| 244 | + |
| 245 | + vals = await get_completions('[inet:fq') |
| 246 | + self.isin(Completion('dn', display='[form] inet:fqdn - A Fully Qualified Domain Name (FQDN).'), vals) |
| 247 | + self.isin(Completion('dn.seen', display='[prop] inet:fqdn.seen - The time interval for first/last observation of the node.'), vals) |
| 248 | + |
| 249 | + vals = await get_completions('[inet:') |
| 250 | + self.isin(Completion('fqdn', display='[form] inet:fqdn - A Fully Qualified Domain Name (FQDN).'), vals) |
| 251 | + self.isin(Completion('ipv4', display='[form] inet:ipv4 - An IPv4 address.'), vals) |
| 252 | + |
| 253 | + # No tags to return |
| 254 | + vals = await get_completions('inet:ipv4#') |
| 255 | + self.len(0, vals) |
| 256 | + |
| 257 | + # Add some tags |
| 258 | + await core.stormlist('[inet:ipv4=1.2.3.4 +#rep.foo]') |
| 259 | + await core.stormlist('[inet:ipv4=1.2.3.5 +#rep.foo.bar]') |
| 260 | + await core.stormlist('[inet:ipv4=1.2.3.6 +#rep.bar]') |
| 261 | + await core.stormlist('[inet:ipv4=1.2.3.7 +#rep.baz]') |
| 262 | + await core.stormlist('[syn:tag=rep :doc="Reputation base."]') |
| 263 | + |
| 264 | + # Check completion of tags |
| 265 | + vals = await get_completions('inet:ipv4#') |
| 266 | + self.len(4, vals) |
| 267 | + self.isin(Completion('rep', display='[tag] rep - Reputation base.'), vals) |
| 268 | + self.isin(Completion('rep.foo', display='[tag] rep.foo'), vals) |
| 269 | + self.isin(Completion('rep.bar', display='[tag] rep.bar'), vals) |
| 270 | + self.isin(Completion('rep.baz', display='[tag] rep.baz'), vals) |
| 271 | + |
| 272 | + vals = await get_completions('inet:ipv4#rep.') |
| 273 | + self.len(4, vals) |
| 274 | + self.isin(Completion('foo', display='[tag] rep.foo'), vals) |
| 275 | + self.isin(Completion('foo.bar', display='[tag] rep.foo.bar'), vals) |
| 276 | + self.isin(Completion('bar', display='[tag] rep.bar'), vals) |
| 277 | + self.isin(Completion('baz', display='[tag] rep.baz'), vals) |
| 278 | + |
| 279 | + vals = await get_completions('inet:ipv4 +#') |
| 280 | + self.isin(Completion('rep.foo', display='[tag] rep.foo'), vals) |
| 281 | + |
| 282 | + vals = await get_completions('inet:ipv4 -#') |
| 283 | + self.isin(Completion('rep.foo', display='[tag] rep.foo'), vals) |
| 284 | + |
| 285 | + vals = await get_completions('[inet:ipv4 +#') |
| 286 | + self.isin(Completion('rep.foo', display='[tag] rep.foo'), vals) |
| 287 | + |
| 288 | + vals = await get_completions('inet:ipv4 { +#') |
| 289 | + self.isin(Completion('rep.foo', display='[tag] rep.foo'), vals) |
| 290 | + |
| 291 | + # Check completion of cmds |
| 292 | + vals = await get_completions('vau') |
| 293 | + self.isin(Completion('lt.add', display='[cmd] vault.add - Add a vault.'), vals) |
| 294 | + self.isin(Completion('lt.set.secrets', display='[cmd] vault.set.secrets - Set vault secret data.'), vals) |
| 295 | + self.isin(Completion('lt.set.configs', display='[cmd] vault.set.configs - Set vault config data.'), vals) |
| 296 | + self.isin(Completion('lt.del', display='[cmd] vault.del - Delete a vault.'), vals) |
| 297 | + self.isin(Completion('lt.list', display='[cmd] vault.list - List available vaults.'), vals) |
| 298 | + self.isin(Completion('lt.set.perm', display='[cmd] vault.set.perm - Set permissions on a vault.'), vals) |
| 299 | + |
| 300 | + vals = await get_completions('inet:ipv4 +#rep.foo | ser') |
| 301 | + self.isin(Completion('vice.add', display='[cmd] service.add - Add a storm service to the cortex.'), vals) |
| 302 | + self.isin(Completion('vice.del', display='[cmd] service.del - Remove a storm service from the cortex.'), vals) |
| 303 | + self.isin(Completion('vice.list', display='[cmd] service.list - List the storm services configured in the cortex.'), vals) |
| 304 | + |
| 305 | + # Check completion of libs |
| 306 | + vals = await get_completions('inet:ipv4 $li') |
| 307 | + self.len(0, vals) |
| 308 | + |
| 309 | + vals = await get_completions('inet:ipv4 $lib') |
| 310 | + self.isin( |
| 311 | + Completion( |
| 312 | + '.auth.easyperm.allowed', |
| 313 | + display='[lib] $lib.auth.easyperm.allowed(edef: dict, level: str) - Check if the current user has a permission level in an easy perm dictionary.' |
| 314 | + ), |
| 315 | + vals |
| 316 | + ) |
| 317 | + |
| 318 | + self.isin( |
| 319 | + Completion( |
| 320 | + '.vault.list', |
| 321 | + display='[lib] $lib.vault.list() - List vaults accessible to the current user.' |
| 322 | + ), |
| 323 | + vals |
| 324 | + ) |
0 commit comments