127 lines
4.7 KiB
Python
127 lines
4.7 KiB
Python
import os
|
|
import nmap
|
|
import pynetbox
|
|
import requests
|
|
import socket
|
|
from concurrent.futures import ThreadPoolExecutor, as_completed
|
|
|
|
# Disable SSL Warnings
|
|
requests.packages.urllib3.disable_warnings()
|
|
|
|
# Disable SSL Verification
|
|
os.environ['PYTHONHTTPSVERIFY'] = '0'
|
|
|
|
# Initiate the port scanner
|
|
nm = nmap.PortScanner()
|
|
|
|
# Scan the subnet for hosts (replace with your networks)
|
|
networks_env = os.getenv("NETWORKS", "192.168.85.0/24,192.168.86.0/24")
|
|
networks = [network.strip() for network in networks_env.split(",") if network.strip()]
|
|
|
|
# Scan source configuration: env, netbox, or mixed
|
|
scan_source = os.getenv("SCAN_SOURCE", "env").strip().lower()
|
|
netbox_prefix_status = os.getenv("NETBOX_PREFIX_STATUS", "").strip().lower()
|
|
|
|
# NetBox configuration
|
|
netbox_url = os.getenv("NETBOX_URL", "https://netbox.xxxxx.xx/")
|
|
netbox_token = os.getenv("NETBOX_TOKEN", "xxxxx")
|
|
ssl_verify = os.getenv("SSL_VERIFY", "false").lower() not in ("0", "false", "no", "n")
|
|
netbox = pynetbox.api(url=netbox_url, token=netbox_token)
|
|
netbox.http_session.verify = ssl_verify
|
|
|
|
tenant = os.getenv("TENANT", "Xxxxx Praha")
|
|
|
|
|
|
def load_networks_from_netbox():
|
|
print("Loading networks from NetBox...")
|
|
prefixes = netbox.ipam.prefixes.filter(status=netbox_prefix_status) if netbox_prefix_status else netbox.ipam.prefixes.all()
|
|
networks_from_netbox = []
|
|
for prefix in prefixes:
|
|
address = getattr(prefix, 'prefix', None)
|
|
if address:
|
|
networks_from_netbox.append(address)
|
|
print(f"Found NetBox prefix: {address}")
|
|
return networks_from_netbox
|
|
|
|
|
|
if scan_source == 'netbox':
|
|
networks = load_networks_from_netbox()
|
|
elif scan_source == 'mixed':
|
|
networks = networks + load_networks_from_netbox()
|
|
|
|
if not networks:
|
|
raise ValueError('No networks configured to scan. Set NETWORKS or SCAN_SOURCE to include NetBox prefixes.')
|
|
|
|
def scan_network(network):
|
|
print(f"Scanning network: {network}")
|
|
nm.scan(hosts=network, arguments='-p 1-32768 -T4 --host-timeout 2m') # Adding a host-timeout of 2 minutes
|
|
host_results = []
|
|
for host in nm.all_hosts():
|
|
if 'tcp' in nm[host]:
|
|
ports = [port for port in nm[host]['tcp'] if nm[host]['tcp'][port]['state'] == 'open']
|
|
ports_str = ' '.join(str(port) for port in ports)
|
|
else:
|
|
ports_str = ''
|
|
host_results.append((host, nm[host]['status']['state'], ports_str))
|
|
print(f"Host: {host}, Status: {nm[host]['status']['state']}, Open Ports: {ports_str}")
|
|
return host_results
|
|
|
|
def add_ip_to_netbox(host, status, ports):
|
|
if status == 'up':
|
|
try:
|
|
hostname = socket.gethostbyaddr(host)[0]
|
|
except socket.herror:
|
|
hostname = host # Use the IP if the hostname couldn't be resolved
|
|
|
|
ip_data = {
|
|
"address": f"{host}/24",
|
|
"dns_name": hostname,
|
|
"status": "active",
|
|
"tenant": tenant,
|
|
"comments": f"Open ports: {ports}",
|
|
# Add other fields as needed
|
|
}
|
|
|
|
# Create or update the IP address in NetBox
|
|
print(f"Adding IP address: {ip_data['address']}, Hostname: {hostname}, Open Ports: {ports}")
|
|
netbox.ipam.ip_addresses.create(ip_data)
|
|
|
|
# Create a thread pool and scan networks in parallel
|
|
hosts_list = []
|
|
with ThreadPoolExecutor(max_workers=5) as executor: # Adjust number of workers as needed
|
|
future_to_network = {executor.submit(scan_network, network): network for network in networks}
|
|
for future in as_completed(future_to_network):
|
|
network = future_to_network[future]
|
|
try:
|
|
data = future.result()
|
|
hosts_list.extend(data)
|
|
except Exception as exc:
|
|
print(f"{network} generated an exception: {exc}")
|
|
|
|
# Add each IP address to NetBox
|
|
with ThreadPoolExecutor(max_workers=5) as executor: # Adjust number of workers as needed
|
|
futures = [executor.submit(add_ip_to_netbox, host, status, ports) for host, status, ports in hosts_list]
|
|
for future in as_completed(futures):
|
|
future.result()
|
|
|
|
# Get all IP addresses from NetBox
|
|
all_ip_addresses = netbox.ipam.ip_addresses.all()
|
|
|
|
# Create a list of hostnames from the scanned hosts
|
|
scanned_hosts = []
|
|
for host, status, ports in hosts_list:
|
|
if status == 'up':
|
|
try:
|
|
hostname = socket.gethostbyaddr(host)[0]
|
|
except socket.herror:
|
|
hostname = host # Use the IP if the hostname couldn't be resolved
|
|
scanned_hosts.append(hostname)
|
|
|
|
# Check each IP address in NetBox
|
|
for ip_address in all_ip_addresses:
|
|
# If the IP address's hostname was not found in the scan results, mark it as offline
|
|
if ip_address.description not in scanned_hosts:
|
|
ip_address.status = 'offline'
|
|
ip_address.save()
|
|
print(f"Marked IP address {ip_address.address} as offline in NetBox.")
|