目录:
subprocess 模块用于执行外部命令和程序,提供了创建和管理子进程的高级接口,能够替代老旧的 os.system() 和 os.popen() 等方法。支持同步和异步执行,可以捕获命令的输出、错误以及返回状态码。
与 os.system() 相比,能够避免 shell 注入漏洞,更安全。
格式:
subprocess.run(args, *, stdin=None, input=None, stdout=None, stderr=None,
capture_output=False, shell=False, cwd=None, timeout=None,
check=False, encoding=None, errors=None, text=None,
env=None, universal_newlines=None)
参数:
import subprocess
result = subprocess.run(['ls', '-l'])
执行上面脚本,可以看到终端中打印当前目录下的所有文件和文件夹。
$ python3 ./test.py
total 1
-rw-r--r-- 1 dkvirus staff 3116 10月 3 21:14 test.py
-rw-r--r--@ 1 dkvirus staff 6457 2月 5 13:12 test.html
将 capture_output 参数设置为 True:
import subprocess
result = subprocess.run(['ls', '-l'], capture_output=True)
执行上面脚本,可以看到终端中不会打印任何内容,这是因为输出已被捕获,可以通过 result.stdout 属性获取输出内容。
$ python3 ./test.py
添加一行打印语句,打印 result.stdout:
import subprocess
result = subprocess.run(['ls', '-l'], capture_output=True)
print(result.stdout)
执行上面脚本,可以看到打印出来是字节数据,很不方便阅读。
$ python3 ./test.py
b'total 1\n-rw-r--r-- 1 dkvirus staff 3116 1\xe6\x9c\x88 30 22:32 test.py\n-rw-r--r--@ 1 dkvirus staff 6457 2\xe6\x9c\x88 5 13:12 test.html\n'
添加 text 参数,值设置为 True:
import subprocess
result = subprocess.run(['ls', '-l'], capture_output=True, text=True)
print(result.stdout)
执行上面脚本,可以看到打印出来是便于阅读的字符串格式:
$ python3 ./test.py
total 1
-rw-r--r-- 1 dkvirus staff 3116 10月 3 21:14 test.py
-rw-r--r--@ 1 dkvirus staff 6457 2月 5 13:12 test.html
import subprocess
# 创建子进程
process = subprocess.Popen(['ls', '-l'], stdout=subprocess.PIPE,
stderr=subprocess.PIPE, text=True)
# 等待进程完成并获取输出
stdout, stderr = process.communicate()
print(f"标准输出:\n{stdout}")
print(f"进程PID: {process.pid}")
print(f"返回码: {process.returncode}")
Popen对象的重要方法:
获取系统信息:
import subprocess
result = subprocess.run(['uname', '-a'], capture_output=True, text=True)
print(f"系统信息: {result.stdout}")
CentOS 服务器检查服务状态:
import subprocess
result = subprocess.run(['systemctl', 'status', 'nginx'],
capture_output=True, text=True)
if 'active (running)' in result.stdout:
print("Nginx正在运行")
执行 Python 脚本:
import subprocess
result = subprocess.run(['python', 'script.py', '--input', 'data.txt',
'--output', 'result.json'], capture_output=True, text=True)
执行 NodeJS 脚本:
import subprocess
result = subprocess.run(['node', 'app.js'], capture_output=True, text=True)
将前一个命令的输出作为下一个命令的输入,如下示例演示 find | grep | sort 这个管理的过程:
import subprocess
p1 = subprocess.Popen(['find', '.', '-name', '*.py'], stdout=subprocess.PIPE)
p2 = subprocess.Popen(['grep', '-v', 'test'], stdin=p1.stdout, stdout=subprocess.PIPE)
p3 = subprocess.Popen(['sort'], stdin=p2.stdout, stdout=subprocess.PIPE)
p1.stdout.close()
p2.stdout.close()
output = p3.communicate()
健壮的错误处理示例:
def safe_run_command(cmd, timeout=30):
try:
result = subprocess.run(cmd, capture_output=True, text=True,
timeout=timeout, check=True)
return True, result.stdout
except subprocess.CalledProcessError as e:
return False, f"命令失败(返回码{e.returncode}): {e.stderr}"
except subprocess.TimeoutExpired:
return False, "命令执行超时"
except FileNotFoundError:
return False, "命令或程序不存在"
except Exception as e:
return False, f"未知错误: {str(e)}"
↶ 返回首页 ↶