[Python] 내부 IP 검색, 포트 확인, SSH 호스트네임 확인
반응형
저의 경우 업무를 위해 개발을 시작할 때 대충 절차가 이렇습니다.
- 회사 VPN 연결
- 회사의 엣지컴퓨팅 개발보드에 SSH 접속
- VSCode로 개발 시작
하는 절차를 거치고 있습니다. 장비마다 네트워크 이름을 별도로 설정해놓지 않아서 매번 포트스캐너를 사용하고, 22번 포트가 열려있는 장치 중에서 내부 호스트이름이 맞는 것을 찾는 것은 매우 귀찮은 일입니다. 언제나 그래왔던 것 처럼 귀찮은 반복작업은 개발자의 큰 적이죠. 파이썬을 사용해서 필요한 헤메는 절차를 자동으로 수행하는 스크립트를 구현합니다.
1 단계. 스캔범위 얻기
import socket
MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]
# IP 범위를 직접 지정하는 경우
# IP_RANGE = [f"192.168.0.{i}" for i in range(1, 255)]
print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")
현재 컴퓨터의 IP를 이용해 스캔할 IP범위를 얻습니다. 혹은 주석에 있는 것 처럼 ipconfig
(윈도우) ifconfig
(리눅스)를 통해 알아낸 기본 게이트웨이를 직접 지정할 수도 있습니다. 직접 접속해 보기 전 까지는 어떤 해당 장비의 어떤 포트가 열려있을지 모르니 결국 가능한 모든 범위를 다 스캔해봐야 합니다.
2 단계. 포트 스캔
import socket
from typing import List
MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]
PORT = 22
# IP 범위를 직접 지정하는 경우
# IP_RANGE = [f"192.168.0.{i}" for i in range(1, 255)]
print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")
socket.setdefaulttimeout(3)
accept_ip: List[str] = []
error_msg: List[str] = []
for ip in IP_RANGE:
print(f"\rscanning... {ip}", end="")
try:
con = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
con.connect((ip, PORT))
con.send(b'Primal Security \n')
service_name = con.recv(1024)
host_name = socket.getfqdn(ip)
accept_ip.append(f"{ip}({host_name}): {service_name}")
except socket.error as msg:
error_msg.append(f"{ip}: {msg}")
finally:
con.close()
print()
print("\n".join(error_msg))
print("\n".join(accept_ip))
for 루프를 이용해 범위 내의 IP에서 22번 포트가 열려있는 IP를 찾습니다. 하지만 한번 스캔하는 데 13분 정도가 걸리는 치명적인 단점이 생겼습니다.
3 단계. 스레드와 큐를 사용하여 포트 검색속도 늘리기
한번에 하나의 IP만 확인하는 것이 아니라 동시에 여러 IP를 확인하는 방식을 사용하면 스캔에 걸리는 시간을 대폭 줄일 수 있습니다.
import queue
import socket
import threading
from typing import List
MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]
PORT = 22
THREAD_NUM = 30
print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")
socket.setdefaulttimeout(1)
QUE: queue.Queue = queue.Queue()
PRINT_LOCK = threading.Lock()
accept_ip: List[str] = []
error_msg: List[str] = []
def scan_ip(ip):
print(f"\rscanning... {ip}", end="")
try:
con = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
con.connect((ip, PORT))
con.send(b'Primal Security \n')
service_name = con.recv(1024)
host_name = socket.getfqdn(ip)
with PRINT_LOCK:
accept_ip.append(f"{ip} ({host_name}): {str(service_name)}")
except socket.error as msg:
error_msg.append(f"{ip}: {msg}")
finally:
con.close()
def result_get():
while True:
worker = QUE.get()
scan_ip(worker)
QUE.task_done()
if __name__ == '__main__':
for i in range(THREAD_NUM):
thread_push = threading.Thread(target=result_get)
thread_push.daemon = True
thread_push.start()
for ip in IP_RANGE:
QUE.put(ip)
QUE.join()
print()
print("\n".join(error_msg))
print("\n".join(accept_ip))
4단계 SSH 호스트네임 확인하기
여기서는 외부 패키지인 Paramiko를 사용합니다. SSH 유저 이름과 비밀번호를 알고 있어야 사용할 수 있습니다.
import queue
import socket
import threading
from typing import List
import paramiko
MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]
PORT = 22
THREAD_NUM = 40
USERNAME = "유저이름"
PASSWORD = "비밀번호"
print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")
socket.setdefaulttimeout(1)
QUE: queue.Queue = queue.Queue()
PRINT_LOCK = threading.Lock()
accept_ip: List[str] = []
accept_ssh: List[str] = []
error_msg: List[str] = []
def scan_ip(ip):
print(f"\rscanning... {ip}", end="")
try:
con = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
con.connect((ip, PORT))
con.send(b'Primal Security \n')
service_name = con.recv(1024).decode('utf-8').strip()
ip_hostname = socket.getfqdn(ip)
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
ssh.connect(ip, port=22, username=USERNAME, password=PASSWORD)
stdout = ssh.exec_command("hostname")[1]
ssh_hostname = stdout.read().decode('utf-8').strip()
ssh.close()
with PRINT_LOCK:
accept_ip.append(f"{ip} ({ip_hostname}): {service_name}")
accept_ssh.append(f"{ip} ({ssh_hostname})")
except socket.error as msg:
error_msg.append(f"{ip}: {msg}")
con.close()
def result_get():
while True:
worker = QUE.get()
scan_ip(worker)
QUE.task_done()
if __name__ == '__main__':
for i in range(THREAD_NUM):
thread_push = threading.Thread(target=result_get)
thread_push.daemon = True
thread_push.start()
for ip in IP_RANGE:
QUE.put(ip)
QUE.join()
print()
print("\n".join(error_msg))
print("\n".join(accept_ip))
print("\n".join(accept_ssh))
반응형
'프로그래밍 > 파이썬' 카테고리의 다른 글
Python 가상환경(venv)에 apt-get으로 설치한 패키지 집어넣기 (0) | 2021.12.31 |
---|---|
Python: 직접 만든 클래스의 이름공간 변경하기 (0) | 2019.12.29 |
Python 패키지 개발일지 10: 스핑크스 레퍼런스 페이지 만들기 (0) | 2019.12.29 |
Python 패키지 개발일지 09: 패키지를 갈아엎게 하는 문제들 (0) | 2019.12.28 |
Python 패키지 개발일지 08: 스핑크스 & 깃허브 페이지 (중급) (0) | 2019.12.27 |
댓글