A-A+

Python asyncio Network Capture TCPdump

2022年10月26日 16:00 汪洋大海 暂无评论 共5887字 (阅读830 views次)

Python 3 有一个很棒的库,叫做asyncio可用于在从 Web 服务器到数据库甚至分布式任务队列的任何事物上编写并发代码。Asyncio 是从非阻塞编程模型创建的,该模型为您的程序提供了在数据可用时委派数据的灵活性,而不是等待接收数据。这可以防止程序中的代码出现瓶颈,并允许同时运行数据任务以从程序的 I/O 中获得更高的性能和处理能力。我在描述 asyncio 时经常听到的一个问题是,“使用 asyncio 编写程序与编写普通的 Python 3 代码有何不同?” 这就是我想写这个教程的原因,描述使用 asyncio 编写 Python 程序的样子,并提供一些代码示例,说明我如何在我正在编写的 Python 模块中使用 asyncio。  

注意:  本教程中的代码是在带有 Python 3.8 开发分支的 Ubuntu Linux、16.04 和 18.04 上编写和测试的。Python 3.7.* 应该也能正常工作,只是还没有经过测试。 

 

网络捕获模块 🚀

几周前,我正在构建Network Capture,这是一个 Python 模块,作为 TCPDump 的包装器编写。网络捕获允许您编排数据包捕获并分析网络流量以实现基于文本的高级过滤功能。在架构和规划阶段,我需要一种快速的方法来显示和分析来自 STDOUT 的数据包捕获数据。我的第一个想法是使用 `subprocess.Popen` 来读取来自捕获的行,如下所示:

capture = subprocess.Popen([self.capture_command], 
					shell=True, 
					stdout=subprocess.PIPE)
 
captured_line = capture.stdout.readline()
 
if captured_line is not b'':
	# 在此处分析并过滤捕获的线条

使用 Python 异步库 🐍

这种方法有效,但它的性能不如我希望的那样好,因为有可能通过 STDOUT 传递大量数据。这种方法在使用 Python 时也有点难以终止和控制 TCPDump 进程的关闭。所以那时我求助于 Python 的 asyncio 库来帮助我同时处理来自 STDOUT 的数据并使用 Python 安全地关闭 TCPDump。让我们看看我是如何做到这一点的。

首先,我需要一个事件循环。在下面的代码示例中,我请求绑定到正在运行网络捕获的主线程的事件循环,但不是当前事件循环网络捕获正在运行它的程序代码。这是事件循环和释放主线程以调度执行而不阻塞网络捕获的事件循环之间的明显区别。所以现在网络捕获有一个事件循环,让我们看一下该循环的生命周期,为此,我们需要检查循环是如何开始和结束的。网络捕获模块的用户体验是,用户将从命令行执行捕获并运行,直到他们强制一个 KeyboardInterrupt 异常中断执行并关闭事件循环。因此,考虑到这次经历,

if __name__ == '__main__':
	# 创建一个新的网络捕获对象并传入系统参数
	nc = network_capture(sys.argv)
	# 创建一个新的运行循环
	eventloop = asyncio.get_event_loop()
	try:
		# 执行捕获直到完成.
		eventloop.run_until_complete(nc.dispatch_capture())
	except KeyboardInterrupt as e:
		print("-- Exiting due to keyboard interrupt --")
	finally:
		eventloop.close()

接下来我们看一下事件循环的run_until_complete方法调用中的逻辑。在这个方法中,我传递了 Network Capture 类中的 dispatch_capture() 方法。这告诉事件循环运行这个协程直到完成,或者在这种情况下,直到引发异常。请注意,在下面的代码中,dispatch_capture、capture_process、capture_read_bytes 和 kill_process 方法都是使用 async 关键字定义的。这意味着这些方法都是 Python 程序中的协程,当您使用 async 关键字时,您可以在调用这些方法之一时使用 await 关键字告诉事件循环等待该协程的未来返回。使用 await 关键字意味着您正在定义一个默认情况下不是线程安全的未来可等待对象。请注意,所有的 capture_process,分配有 await 关键字的期货。等待协程的返回将在可用时提供结果,回到那个异步编程模型,即未来或调用者在数据准备好时接收数据,但不等待该数据并阻止程序的执行。

这个包含对 capture_process 和 capture_read_bytes 调用的循环是异步输入/输出的完美示例,因为这是从 TCPDump 捕获接收数据的程序的实际部分。一个好的测试是使用 asyncio 并仅使用直接的串行 Python 循环来测量来自 capture_read_bytes 的输入/输出的响应。

class network_capture(object):
 
	__slots__ = ('keywords', 
		     'capture_cmd',
		     'pcap_file', 
		     'txt_file',
		     'validation_values',
		     'validation_map')
 
	# 构造函数
	def __init__(self, *args):
		...
 
	# 异步方法开始捕获.
	async def dispatch_capture(self):
 
		print("Capturing command: {0}".format(self.capture_cmd))
		print("Capturing keywords: {0}".format(self.keywords))
		line_count = 0
		capture_pid = None
		capture_stdout = None
		keyword_filtering = False
 
 
		if len(self.keywords) > 0:
			keyword_filtering = True
 
		# Open a file and start a read/write context
		with open(self.txt_file, 'w') as txt_file_obj:
 
			try:
				# 只是为了确保您不需要输入密码.
				# 设置文件模式为全读/写
				os.system("chmod 777 " + self.txt_file)
 
				print("-- Start Capturing Network Traffic --")
 
				capture_pid = await self.capture_process()
				while True:
					captured_line = await self.capture_read_bytes(capture_pid)
 
					if captured_line is not None and captured_line != b'':
 
						captured_line = captured_line.decode("utf-8")
						print("{0} \n".format(captured_line))
 
						if keyword_filtering:
							if any(key in captured_line for key in self.keywords):
								print("** Keyword found. Writing to log **")
								txt_file_obj.write(captured_line + "\n")
								# Removing this until this works
								line_count += 1
						else:
							txt_file_obj.write(captured_line + "\n")
							line_count += 1
 
			except OSError as err:
				print("-- Exiting due to an operating system failure --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
				sys.exit(0)
			except AttributeError as err:
				print("-- Exiting due to an AttributeError --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
			except:
				print("-- Unexpected excpetion received --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Errors: {0}".format(sys.exc_info()[0]))
				sys.exit(0)
			finally:
				txt_file_obj.close()
				await self.kill_process(capture_pid)
 
	# 异步方法来执行 tcpdump 命令并将它们通过管道传回
	# 等待的 pid.
	async def capture_process(self):
		return await asyncio.create_subprocess_shell(self.capture_cmd, 
												  stdout=asyncio.subprocess.PIPE,
												  stderr=asyncio.subprocess.PIPE)
 
	# Async方法从stdout读取一行并将其返回给等待的调用者。该行已格式化、打印和评估.
	async def capture_read_bytes(self, capture_pid):
		return await capture_pid.stdout.readline()
 
	# 从dispatch_capture执行异步方法以获取kill并等待进程终止.
	async def kill_process(self, capture_pid):
		capture_pid.kill()
		await capture_pid.wait()

现在我已经讨论了协程和期货如何工作的一个非常基本的示例,让我们来谈谈如何终止事件循环的使用。如前所述,使用并尝试安全地关闭 TCPDump 进程已被证明是具有挑战性的。我发现使用 asyncio 来杀死 pid(进程 ID)并等待它完成的未来返回是一个更安全、更清洁的选择。例如,在下面的代码中,程序捕获在捕获逻辑上运行的错误并等待 kill_process 的返回。在 kill_process 中,会发送一个信号来终止 pid 并等待直到完成。从那里捕获外部 KeyboardInterrupt 异常,并且此 catch 块最终关闭运行您的异步调度的事件循环。

class network_capture(object):
	# 开始捕获的异步方法.
	async def dispatch_capture(self):
		# Open a file and start a read/write context
		with open(self.txt_file, 'w') as txt_file_obj:
 
			try:
				...
			except OSError as err:
				print("-- Exiting due to an operating system failure --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
				sys.exit(0)
			except AttributeError as err:
				print("-- Exiting due to an AttributeError --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Error: {0}".format(err))
			except:
				print("-- Unexpected excpetion received --")
				print("-- {0} lines captured in your filter --"
						.format(line_count))
				print("Errors: {0}".format(sys.exc_info()[0]))
				sys.exit(0)
			finally:
				txt_file_obj.close()
				# 1) kill_process is awaited
				await self.kill_process(capture_pid)
 
	# 从dispatch_capture执行异步方法以获取kill并等待进程终止.
	async def kill_process(self, capture_pid):
		# Signal is sent to kill the pid
		capture_pid.kill()
		await capture_pid.wait()
 
if __name__ == '__main__':
	# 创建一个新的网络捕获对象并传入系统参数
	nc = network_capture(sys.argv)
	# 创建一个新的运行循环
	eventloop = asyncio.get_event_loop()
	try:
		# 执行捕获直到完成.
		eventloop.run_until_complete(nc.dispatch_capture())
	except KeyboardInterrupt as e:
		print("-- Exiting due to keyboard interrupt --")
	finally:
		# 最后事件循环关闭
		eventloop.close()

总结 ⌛️

总之,使用 asyncio 是一种非常简单的方式,可以在没有大量开销的情况下向程序添加并发性。Python 的 asyncio 库在过去一年左右的时间里取得了很大的进展,从外观上看,事情只会变得更强大。希望现在您已经了解了更多关于 Python 的 asyncio 以及如何将其合并到您的程序中的知识。如果您有任何问题、意见或疑虑,请告诉我,我很乐意听取您的意见。本教程的完整代码在我的 Github上。

文章来源:https://www.agnosticdev.com/content/how-use-python-asyncio-library

布施恩德可便相知重

微信扫一扫打赏

支付宝扫一扫打赏

×

给我留言