Files
netbox-scanner/ipscan-v2.py

183 lines
6.1 KiB
Python

import os
import nmap
import pynetbox
import requests
import socket
import subprocess
import threading
from concurrent.futures import ThreadPoolExecutor, as_completed
# Disable SSL Warnings
requests.packages.urllib3.disable_warnings()
# Disable SSL Verification
os.environ['PYTHONHTTPSVERIFY'] = '0'
# Initiate the port scanner
nm = nmap.PortScanner()
# Scan the subnet for hosts (replace with your networks)
networks_env = os.getenv("NETWORKS", "192.168.85.0/24,192.168.86.0/24")
networks = [network.strip() for network in networks_env.split(",") if network.strip()]
# Scan source configuration: env, netbox, or mixed
scan_source = os.getenv("SCAN_SOURCE", "env").strip().lower()
netbox_prefix_status = os.getenv("NETBOX_PREFIX_STATUS", "").strip().lower()
# Parallelism
max_scan_workers = int(os.getenv("MAX_SCAN_WORKERS", "5"))
max_import_workers = int(os.getenv("MAX_IMPORT_WORKERS", "5"))
# Watchdog: comma-separated IPs that must stay reachable during the scan
watchdog_ips_env = os.getenv("WATCHDOG_IPS", "").strip()
watchdog_ips = [ip.strip() for ip in watchdog_ips_env.split(",") if ip.strip()]
watchdog_interval = int(os.getenv("WATCHDOG_INTERVAL", "10"))
# NetBox configuration
netbox_url = os.getenv("NETBOX_URL", "https://netbox.xxxxx.xx/")
netbox_token = os.getenv("NETBOX_TOKEN", "xxxxx")
ssl_verify = os.getenv("SSL_VERIFY", "false").lower() not in ("0", "false", "no", "n")
netbox = pynetbox.api(url=netbox_url, token=netbox_token)
netbox.http_session.verify = ssl_verify
tenant = os.getenv("TENANT", "Xxxxx Praha")
# Shared abort flag set by the watchdog when a monitored IP goes down
abort_event = threading.Event()
def ping(ip):
result = subprocess.run(
["ping", "-c", "1", "-W", "2", ip],
stdout=subprocess.DEVNULL,
stderr=subprocess.DEVNULL,
)
return result.returncode == 0
def watchdog_loop():
if not watchdog_ips:
return
print(f"Watchdog monitoring: {', '.join(watchdog_ips)} (interval: {watchdog_interval}s)")
while not abort_event.is_set():
for ip in watchdog_ips:
if not ping(ip):
print(f"[WATCHDOG] {ip} is unreachable — aborting scan.")
abort_event.set()
return
abort_event.wait(watchdog_interval)
def load_networks_from_netbox():
print("Loading networks from NetBox...")
prefixes = netbox.ipam.prefixes.filter(status=netbox_prefix_status) if netbox_prefix_status else netbox.ipam.prefixes.all()
networks_from_netbox = []
for prefix in prefixes:
address = getattr(prefix, 'prefix', None)
if address:
networks_from_netbox.append(address)
print(f"Found NetBox prefix: {address}")
return networks_from_netbox
if scan_source == 'netbox':
networks = load_networks_from_netbox()
elif scan_source == 'mixed':
networks = networks + load_networks_from_netbox()
if not networks:
raise ValueError('No networks configured to scan. Set NETWORKS or SCAN_SOURCE to include NetBox prefixes.')
def scan_network(network):
if abort_event.is_set():
print(f"Skipping network {network} — abort signalled.")
return []
print(f"Scanning network: {network}")
nm.scan(hosts=network, arguments='-p 1-32768 -T4 --host-timeout 2m')
host_results = []
for host in nm.all_hosts():
if abort_event.is_set():
break
if 'tcp' in nm[host]:
ports = [port for port in nm[host]['tcp'] if nm[host]['tcp'][port]['state'] == 'open']
ports_str = ' '.join(str(port) for port in ports)
else:
ports_str = ''
host_results.append((host, nm[host]['status']['state'], ports_str))
print(f"Host: {host}, Status: {nm[host]['status']['state']}, Open Ports: {ports_str}")
return host_results
def add_ip_to_netbox(host, status, ports):
if status == 'up':
try:
hostname = socket.gethostbyaddr(host)[0]
except socket.herror:
hostname = host
ip_data = {
"address": f"{host}/24",
"dns_name": hostname,
"status": "active",
"tenant": tenant,
"comments": f"Open ports: {ports}",
}
print(f"Adding IP address: {ip_data['address']}, Hostname: {hostname}, Open Ports: {ports}")
netbox.ipam.ip_addresses.create(ip_data)
# Start watchdog in background thread
watchdog_thread = threading.Thread(target=watchdog_loop, daemon=True)
watchdog_thread.start()
# Scan networks in parallel
hosts_list = []
with ThreadPoolExecutor(max_workers=max_scan_workers) as executor:
future_to_network = {executor.submit(scan_network, network): network for network in networks}
for future in as_completed(future_to_network):
network = future_to_network[future]
try:
data = future.result()
hosts_list.extend(data)
except Exception as exc:
print(f"{network} generated an exception: {exc}")
if abort_event.is_set():
print("Abort flag set — cancelling remaining scan futures.")
for f in future_to_network:
f.cancel()
break
# Stop watchdog
abort_event.set()
if abort_event.is_set() and not hosts_list:
raise SystemExit("Scan aborted by watchdog before any results were collected.")
# Import collected results into NetBox
with ThreadPoolExecutor(max_workers=max_import_workers) as executor:
futures = [executor.submit(add_ip_to_netbox, host, status, ports) for host, status, ports in hosts_list]
for future in as_completed(futures):
future.result()
# Get all IP addresses from NetBox
all_ip_addresses = netbox.ipam.ip_addresses.all()
# Build list of hostnames found during this scan
scanned_hosts = []
for host, status, ports in hosts_list:
if status == 'up':
try:
hostname = socket.gethostbyaddr(host)[0]
except socket.herror:
hostname = host
scanned_hosts.append(hostname)
# Mark IPs not seen in this scan as offline
for ip_address in all_ip_addresses:
if ip_address.description not in scanned_hosts:
ip_address.status = 'offline'
ip_address.save()
print(f"Marked IP address {ip_address.address} as offline in NetBox.")