soma0sd

코딩 & 과학 & 교육

[Python] 내부 IP 검색, 포트 확인, SSH 호스트네임 확인

반응형

저의 경우 업무를 위해 개발을 시작할 때 대충 절차가 이렇습니다.

  1. 회사 VPN 연결
  2. 회사의 엣지컴퓨팅 개발보드에 SSH 접속
  3. VSCode로 개발 시작

하는 절차를 거치고 있습니다. 장비마다 네트워크 이름을 별도로 설정해놓지 않아서 매번 포트스캐너를 사용하고, 22번 포트가 열려있는 장치 중에서 내부 호스트이름이 맞는 것을 찾는 것은 매우 귀찮은 일입니다. 언제나 그래왔던 것 처럼 귀찮은 반복작업은 개발자의 큰 적이죠. 파이썬을 사용해서 필요한 헤메는 절차를 자동으로 수행하는 스크립트를 구현합니다.

1 단계. 스캔범위 얻기

import socket

MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]

# IP 범위를 직접 지정하는 경우
# IP_RANGE = [f"192.168.0.{i}" for i in range(1, 255)]

print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")

현재 컴퓨터의 IP를 이용해 스캔할 IP범위를 얻습니다. 혹은 주석에 있는 것 처럼 ipconfig(윈도우) ifconfig(리눅스)를 통해 알아낸 기본 게이트웨이를 직접 지정할 수도 있습니다. 직접 접속해 보기 전 까지는 어떤 해당 장비의 어떤 포트가 열려있을지 모르니 결국 가능한 모든 범위를 다 스캔해봐야 합니다.

2 단계. 포트 스캔

import socket
from typing import List

MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]
PORT = 22

# IP 범위를 직접 지정하는 경우
# IP_RANGE = [f"192.168.0.{i}" for i in range(1, 255)]

print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")

socket.setdefaulttimeout(3)

accept_ip: List[str] = []
error_msg: List[str] = []
for ip in IP_RANGE:
    print(f"\rscanning... {ip}", end="")
    try:
        con = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        con.connect((ip, PORT))
        con.send(b'Primal Security \n')
        service_name = con.recv(1024)
        host_name = socket.getfqdn(ip)
        accept_ip.append(f"{ip}({host_name}): {service_name}")
    except socket.error as msg:
        error_msg.append(f"{ip}: {msg}")
    finally:
        con.close()

print()
print("\n".join(error_msg))
print("\n".join(accept_ip))

for 루프를 이용해 범위 내의 IP에서 22번 포트가 열려있는 IP를 찾습니다. 하지만 한번 스캔하는 데 13분 정도가 걸리는 치명적인 단점이 생겼습니다.

3 단계. 스레드와 큐를 사용하여 포트 검색속도 늘리기

한번에 하나의 IP만 확인하는 것이 아니라 동시에 여러 IP를 확인하는 방식을 사용하면 스캔에 걸리는 시간을 대폭 줄일 수 있습니다.

import queue
import socket
import threading
from typing import List

MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]
PORT = 22
THREAD_NUM = 30


print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")

socket.setdefaulttimeout(1)
QUE: queue.Queue = queue.Queue()
PRINT_LOCK = threading.Lock()

accept_ip: List[str] = []
error_msg: List[str] = []

def scan_ip(ip):
    print(f"\rscanning... {ip}", end="")
    try:
        con = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        con.connect((ip, PORT))
        con.send(b'Primal Security \n')
        service_name = con.recv(1024)
        host_name = socket.getfqdn(ip)
        with PRINT_LOCK:
            accept_ip.append(f"{ip} ({host_name}): {str(service_name)}")
    except socket.error as msg:
        error_msg.append(f"{ip}: {msg}")
    finally:
        con.close()

def result_get():
    while True:
        worker = QUE.get()
        scan_ip(worker)
        QUE.task_done()

if __name__ == '__main__':
    for i in range(THREAD_NUM):
        thread_push = threading.Thread(target=result_get)
        thread_push.daemon = True
        thread_push.start()
    for ip in IP_RANGE:
        QUE.put(ip)
    QUE.join()

    print()
    print("\n".join(error_msg))
    print("\n".join(accept_ip))

4단계 SSH 호스트네임 확인하기

여기서는 외부 패키지인 Paramiko를 사용합니다. SSH 유저 이름과 비밀번호를 알고 있어야 사용할 수 있습니다.

import queue
import socket
import threading
from typing import List

import paramiko

MY_IP = socket.gethostbyname(socket.gethostname())
IP_RANGE = [".".join([*MY_IP.split(".")[:3], str(i)]) for i in range(1, 255)]
PORT = 22
THREAD_NUM = 40

USERNAME = "유저이름"
PASSWORD = "비밀번호"

print(f"IP SCAN RANGE: {IP_RANGE[0]}-{IP_RANGE[-1]}")

socket.setdefaulttimeout(1)
QUE: queue.Queue = queue.Queue()
PRINT_LOCK = threading.Lock()

accept_ip: List[str] = []
accept_ssh: List[str] = []
error_msg: List[str] = []

def scan_ip(ip):
    print(f"\rscanning... {ip}", end="")
    try:
        con = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        con.connect((ip, PORT))
        con.send(b'Primal Security \n')
        service_name = con.recv(1024).decode('utf-8').strip()
        ip_hostname = socket.getfqdn(ip)
        ssh = paramiko.SSHClient()
        ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy)
        ssh.connect(ip, port=22, username=USERNAME, password=PASSWORD)
        stdout = ssh.exec_command("hostname")[1]
        ssh_hostname = stdout.read().decode('utf-8').strip()
        ssh.close()
        with PRINT_LOCK:
            accept_ip.append(f"{ip} ({ip_hostname}): {service_name}")
            accept_ssh.append(f"{ip} ({ssh_hostname})")
    except socket.error as msg:
        error_msg.append(f"{ip}: {msg}")
        con.close()


def result_get():
    while True:
        worker = QUE.get()
        scan_ip(worker)
        QUE.task_done()

if __name__ == '__main__':
    for i in range(THREAD_NUM):
        thread_push = threading.Thread(target=result_get)
        thread_push.daemon = True
        thread_push.start()
    for ip in IP_RANGE:
        QUE.put(ip)
    QUE.join()

    print()
    print("\n".join(error_msg))
    print("\n".join(accept_ip))
    print("\n".join(accept_ssh))
반응형
태그:

댓글

End of content

No more pages to load