Network Security Internet Technology Development Database Servers Mobile Phone Android Software Apple Software Computer Software News IT Information

In addition to Weibo, there is also WeChat

Please pay attention

WeChat public account

Shulou

How to use SSH of Paramiko in python

2025-04-01 Update From: SLTechnology News&Howtos shulou NAV: SLTechnology News&Howtos > Development >

Share

Shulou(Shulou.com)06/03 Report--

This article will explain in detail how to use the SSH of Paramiko in python. The editor thinks it is very practical, so I share it for you as a reference. I hope you can get something after reading this article.

Description

1. Write all the device information to the text document.

Simply use txt to build a dictionary of login information.

2. Initialize SSH connection and execute commands.

3. Analyze the commands and output results specified by this requirement.

Store the results in a file.

4. Add multi-thread execution.

Improve efficiency.

5. Add the crontab of Linux.

Collect information every hour (server configuration)

Example

#! / usr/bin/env python#-*-coding:utf-8-*-import reimport timefrom concurrent.futures import ThreadPoolExecutorimport paramiko def get_device_list (filename): "reads the list of devices from a text file and returns a list of dictionaries. The text format is: ip, user name, password, alias, for example: 1.1.1.1 admin admin sw1 1.1.1.2 admin admin sw2. Args: filename ([str]): file name "" with open (filename,'r') as f: device_list = [] for line in f.readlines (): ip, username, password, name = line.strip () .split () device_list.append ({"ip": ip "username": username, "password": password, "name": name,}) return device_list class NetworkDevice (object): def _ _ init__ (self, ip= "", username= "", password= "'", name= "", port=22 ): self.conn = None if ip: self.ip = ip.strip () elif name: self.name = name.strip () else: raise ValueError ("device connection address (ip or alias required)") self.port = int (port) self.username = username self.password = password self . _ open_ssh () def _ open_ssh (self): "" initializes the SSH connection Set up an analog terminal so that commands can be executed until the end of the session. Raises: e: throw any exception of paramiko connection failure "ssh_connect_params = {" hostname ": self.ip," port ": self.port," username ": self.username," password ": self.password," look_for_keys ": False," allow_agent ": False "timeout": 5, # TCP connection timeout} conn = paramiko.SSHClient () conn.set_missing_host_key_policy (paramiko.AutoAddPolicy ()) try: conn.connect (* * ssh_connect_params) except Exception as e: raise e self.conn = conn.invoke_shell (term= "vt100", width=500 Height=1000) return "" def exec_cmd (self, cmd, recv_time=3): "" log in to the device Execute the command Args: cmd ([type]): command string recv_time (int, optional): the timeout for reading echo information. Defaults to 3. Raises: EOFError: there is no information output, indicating that the connection failed. Returns: output: "" cmd = cmd.strip () + "\ n" self.conn.sendall ("screen disable\ n") self.conn.sendall (cmd) time.sleep (int (recv_time)) output = self.conn.recv (1024024) if len (output) = 0: raise EOFError ("connection may be closed Return output.decode ('utf-8',' ignore') dev = {"ip": "192.168.56.21", "username": "netdevops", "password": "Admin@h4c.com" "name": "sw1"} # sw1 = NetworkDevice (* * dev) # ret = sw1.exec_cmd ("dis version") # print (ret) def parse_interface_drop (output): "" parse the output queue packet loss information of the device into cumulative values command and output examples are as follows: # [H3C] dis qos queue-statistics interface outbound | in "^ Drop" # Dropped: 0 packets 0 bytes "" ptn = re.compile (r "\ s (\ S+):\ s + (\ d +)\ s + (\ S+),\ s + (\ d +)\ s + (\ S+)") count = 0 for i in ptn.findall (output): count + = int (I [1]) return count def run (cmd, * * conn_parms): "" Log in to a single device Execute the specified command Parse packet loss statistics "sw = NetworkDevice (* * conn_parms) output = sw.exec_cmd (cmd) drop_count = parse_interface_drop (output) return"% s% s "% (conn_parms.get (" name "), conn_parms.get (" ip "), drop_count) # cmd = r'dis qos queue-statistics interface outbound | in" ^ Drop "'# ret = run (cmd) * * dev) # print (ret) if _ _ name__== "_ _ main__": "" get the device list Use multithreading to log in to the device to get information and return "" with ThreadPoolExecutor (10) as pool: futures = [] cmd = r'dis qos queue-statistics interface outbound | in "^ Drop" 'dev_info = get_device_list (". / iplist.txt") for d in dev_info: future = pool.submit (run, cmd) * * d) futures.append (future) # for f in futures: # print (f.result ()) # write the result to the file according to the execution time Accurate to the hour with open (". / drops/%s.log"% time.strftime ("% Y%m%d_%H"),'w') as f: for line in futures: f.write (line.result () + "\ n") this article on "how to use the SSH of Paramiko in python" ends here. I hope the above content can be of some help to you, so that you can learn more knowledge. If you think the article is good, please share it for more people to see.

Welcome to subscribe "Shulou Technology Information " to get latest news, interesting things and hot topics in the IT industry, and controls the hottest and latest Internet news, technology news and IT industry trends.

Views: 0

*The comments in the above article only represent the author's personal views and do not represent the views and positions of this website. If you have more insights, please feel free to contribute and share.

Share To

Development

Wechat

© 2024 shulou.com SLNews company. All rights reserved.

12
Report