Files
python_skripte/ssh_with_sub/main.py
2024-01-25 10:03:49 +01:00

77 lines
2.9 KiB
Python
Executable File

#!/usr/bin/env python3.10
from csv import DictReader
import os, socket, subprocess
def read_csv(csv_file = str):
with open(csv_file, newline='') as csv:
read_server = DictReader(csv)
server_liste, user_liste = [] , []
for row in read_server:
user_liste.append(row['user'])
server_liste.append(row['\ufeffserver'])
return {
'server' : server_liste,
'user' : user_liste
}
def ssh_connection(con_server= str,con_user = str,bash_script = """ls"""):
return subprocess.Popen(
"ssh -T {user}@{host} '{cmd}'".format(user=con_user, host=con_server, cmd=bash_script),
shell=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
).communicate()
def ssh_connection_tt(con_server= str,con_user = str,bash_script = """ls"""):
return subprocess.Popen(
"echo '{cmd};exit'|ssh {host} -tt sudo su - {user} ".format(user=con_user, host=con_server, cmd=bash_script),
shell=True,
universal_newlines=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
).communicate()
def ssh_check(con_server = str, check_port = 22):
response = os.system("ping -c 1 " + con_server + '>& /dev/null')
if response == 0:
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
return s.connect_ex((con_server, check_port)) == 0
else:
return False
if __name__ == "__main__":
script_folder = os.path.dirname(os.path.realpath(__file__))
server_csv = script_folder + os.sep + "test.csv"
run_script_bash = """
GREP_PACK="atlas_ibtools"
echo "test"
bash --version
"""
run_oneline_bash="""ll; echo 'test'"""
for count in range(0,len(read_csv(server_csv)['server'])):
servername = read_csv(server_csv)['server'][count]
con_username = read_csv(server_csv)['user'][count]
user_server_con = str(con_username + '@' + servername)
print(servername, "verbindng wird geprüft...",sep=' ')
if ssh_check(con_server=servername) is True:
print(servername, "erreichbar...", sep=' ')
for show_output in list(ssh_connection_tt(con_server=servername, con_user=con_username, bash_script=run_oneline_bash)):
print(show_output.rstrip())
for show_output in list(ssh_connection(con_server=servername, con_user=con_username, bash_script=run_script_bash)):
print(show_output.rstrip())
else:
print(servername, "nicht erreichbar oder kein ssh!!", sep=' ')
#if ssh_check(con_server=servername) is True:
# print(servername, "port offen")
#print("Server:", read_csv(server_csv)['server'][count], "User:", read_csv(server_csv)['user'][count], sep=" ")
#ssh_connection(con_server=read_csv("test.csv")['server'][count])