|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import argparse |
4 | | -import asyncio |
5 | 4 | import os |
6 | 5 | import signal |
7 | 6 | import sys |
8 | 7 | import threading |
9 | | -from typing import Any, Set |
10 | 8 |
|
11 | | -from .exceptions import ConnectionClosed |
12 | | -from .frames import Close |
13 | | -from .legacy.client import connect |
| 9 | + |
| 10 | +try: |
| 11 | + import readline # noqa |
| 12 | +except ImportError: # Windows has no `readline` normally |
| 13 | + pass |
| 14 | + |
| 15 | +from .sync.client import ClientConnection, connect |
14 | 16 | from .version import version as websockets_version |
15 | 17 |
|
16 | 18 |
|
@@ -46,21 +48,6 @@ def win_enable_vt100() -> None: |
46 | 48 | raise RuntimeError("unable to set console mode") |
47 | 49 |
|
48 | 50 |
|
49 | | -def exit_from_event_loop_thread( |
50 | | - loop: asyncio.AbstractEventLoop, |
51 | | - stop: asyncio.Future[None], |
52 | | -) -> None: |
53 | | - loop.stop() |
54 | | - if not stop.done(): |
55 | | - # When exiting the thread that runs the event loop, raise |
56 | | - # KeyboardInterrupt in the main thread to exit the program. |
57 | | - if sys.platform == "win32": |
58 | | - ctrl_c = signal.CTRL_C_EVENT |
59 | | - else: |
60 | | - ctrl_c = signal.SIGINT |
61 | | - os.kill(os.getpid(), ctrl_c) |
62 | | - |
63 | | - |
64 | 51 | def print_during_input(string: str) -> None: |
65 | 52 | sys.stdout.write( |
66 | 53 | # Save cursor position |
@@ -93,63 +80,20 @@ def print_over_input(string: str) -> None: |
93 | 80 | sys.stdout.flush() |
94 | 81 |
|
95 | 82 |
|
96 | | -async def run_client( |
97 | | - uri: str, |
98 | | - loop: asyncio.AbstractEventLoop, |
99 | | - inputs: asyncio.Queue[str], |
100 | | - stop: asyncio.Future[None], |
101 | | -) -> None: |
102 | | - try: |
103 | | - websocket = await connect(uri) |
104 | | - except Exception as exc: |
105 | | - print_over_input(f"Failed to connect to {uri}: {exc}.") |
106 | | - exit_from_event_loop_thread(loop, stop) |
107 | | - return |
108 | | - else: |
109 | | - print_during_input(f"Connected to {uri}.") |
110 | | - |
111 | | - try: |
112 | | - while True: |
113 | | - incoming: asyncio.Future[Any] = asyncio.create_task(websocket.recv()) |
114 | | - outgoing: asyncio.Future[Any] = asyncio.create_task(inputs.get()) |
115 | | - done: Set[asyncio.Future[Any]] |
116 | | - pending: Set[asyncio.Future[Any]] |
117 | | - done, pending = await asyncio.wait( |
118 | | - [incoming, outgoing, stop], return_when=asyncio.FIRST_COMPLETED |
119 | | - ) |
120 | | - |
121 | | - # Cancel pending tasks to avoid leaking them. |
122 | | - if incoming in pending: |
123 | | - incoming.cancel() |
124 | | - if outgoing in pending: |
125 | | - outgoing.cancel() |
126 | | - |
127 | | - if incoming in done: |
128 | | - try: |
129 | | - message = incoming.result() |
130 | | - except ConnectionClosed: |
131 | | - break |
132 | | - else: |
133 | | - if isinstance(message, str): |
134 | | - print_during_input("< " + message) |
135 | | - else: |
136 | | - print_during_input("< (binary) " + message.hex()) |
137 | | - |
138 | | - if outgoing in done: |
139 | | - message = outgoing.result() |
140 | | - await websocket.send(message) |
141 | | - |
142 | | - if stop in done: |
143 | | - break |
144 | | - |
145 | | - finally: |
146 | | - await websocket.close() |
147 | | - assert websocket.close_code is not None and websocket.close_reason is not None |
148 | | - close_status = Close(websocket.close_code, websocket.close_reason) |
149 | | - |
150 | | - print_over_input(f"Connection closed: {close_status}.") |
151 | | - |
152 | | - exit_from_event_loop_thread(loop, stop) |
| 83 | +def print_incoming_messages(websocket: ClientConnection, stop: threading.Event) -> None: |
| 84 | + for message in websocket: |
| 85 | + if isinstance(message, str): |
| 86 | + print_during_input("< " + message) |
| 87 | + else: |
| 88 | + print_during_input("< (binary) " + message.hex()) |
| 89 | + if not stop.is_set(): |
| 90 | + # When the server closes the connection, raise KeyboardInterrupt |
| 91 | + # in the main thread to exit the program. |
| 92 | + if sys.platform == "win32": |
| 93 | + ctrl_c = signal.CTRL_C_EVENT |
| 94 | + else: |
| 95 | + ctrl_c = signal.SIGINT |
| 96 | + os.kill(os.getpid(), ctrl_c) |
153 | 97 |
|
154 | 98 |
|
155 | 99 | def main() -> None: |
@@ -184,47 +128,32 @@ def main() -> None: |
184 | 128 | sys.stderr.flush() |
185 | 129 |
|
186 | 130 | try: |
187 | | - import readline # noqa |
188 | | - except ImportError: # Windows has no `readline` normally |
189 | | - pass |
190 | | - |
191 | | - # Create an event loop that will run in a background thread. |
192 | | - loop = asyncio.new_event_loop() |
193 | | - |
194 | | - # Due to zealous removal of the loop parameter in the Queue constructor, |
195 | | - # we need a factory coroutine to run in the freshly created event loop. |
196 | | - async def queue_factory() -> asyncio.Queue[str]: |
197 | | - return asyncio.Queue() |
198 | | - |
199 | | - # Create a queue of user inputs. There's no need to limit its size. |
200 | | - inputs: asyncio.Queue[str] = loop.run_until_complete(queue_factory()) |
201 | | - |
202 | | - # Create a stop condition when receiving SIGINT or SIGTERM. |
203 | | - stop: asyncio.Future[None] = loop.create_future() |
| 131 | + websocket = connect(args.uri) |
| 132 | + except Exception as exc: |
| 133 | + print(f"Failed to connect to {args.uri}: {exc}.") |
| 134 | + sys.exit(1) |
| 135 | + else: |
| 136 | + print(f"Connected to {args.uri}.") |
204 | 137 |
|
205 | | - # Schedule the task that will manage the connection. |
206 | | - loop.create_task(run_client(args.uri, loop, inputs, stop)) |
| 138 | + stop = threading.Event() |
207 | 139 |
|
208 | | - # Start the event loop in a background thread. |
209 | | - thread = threading.Thread(target=loop.run_forever) |
| 140 | + # Start the thread that reads messages from the connection. |
| 141 | + thread = threading.Thread(target=print_incoming_messages, args=(websocket, stop)) |
210 | 142 | thread.start() |
211 | 143 |
|
212 | 144 | # Read from stdin in the main thread in order to receive signals. |
213 | 145 | try: |
214 | 146 | while True: |
215 | 147 | # Since there's no size limit, put_nowait is identical to put. |
216 | 148 | message = input("> ") |
217 | | - loop.call_soon_threadsafe(inputs.put_nowait, message) |
| 149 | + websocket.send(message) |
218 | 150 | except (KeyboardInterrupt, EOFError): # ^C, ^D |
219 | | - loop.call_soon_threadsafe(stop.set_result, None) |
| 151 | + stop.set() |
| 152 | + websocket.close() |
| 153 | + print_over_input("Connection closed.") |
220 | 154 |
|
221 | | - # Wait for the event loop to terminate. |
222 | 155 | thread.join() |
223 | 156 |
|
224 | | - # For reasons unclear, even though the loop is closed in the thread, |
225 | | - # it still thinks it's running here. |
226 | | - loop.close() |
227 | | - |
228 | 157 |
|
229 | 158 | if __name__ == "__main__": |
230 | 159 | main() |
0 commit comments