mirror of
https://github.com/bitcoin/bitcoin.git
synced 2026-01-31 10:41:08 +00:00
The encoding arg is confusing, because it is not applied consistently for all IO. Also, it is useless, as the majority of files are ASCII encoded, which are fine to encode and decode with any mode. Moreover, UTF-8 is already required for most scripts to work properly, so setting the encoding twice is redundant. So remove the encoding from most IO. It would be fine to remove from all IO, however I kept it for two files: * contrib/asmap/asmap-tool.py: This specifically looks for utf-8 encoding errors, so it makes sense to sepecify the utf-8 encoding explicitly. * test/functional/test_framework/test_node.py: Reading the debug log in text mode specifically counts the utf-8 characters (not bytes), so it makes sense to specify the utf-8 encoding explicitly.
184 lines
5.8 KiB
Python
184 lines
5.8 KiB
Python
#!/usr/bin/env python3
|
|
# Copyright (c) 2014-2022 The Bitcoin Core developers
|
|
# Distributed under the MIT software license, see the accompanying
|
|
# file COPYING or http://www.opensource.org/licenses/mit-license.php.
|
|
"""Linux network utilities.
|
|
|
|
Roughly based on https://web.archive.org/web/20190424172231/http://voorloopnul.com/blog/a-python-netstat-in-less-than-100-lines-of-code/ by Ricardo Pascal
|
|
"""
|
|
|
|
import sys
|
|
import socket
|
|
import struct
|
|
import array
|
|
import os
|
|
|
|
# Easily unreachable address. Attempts to connect to it will stay within the machine.
|
|
# Used to avoid non-loopback traffic or DNS queries.
|
|
UNREACHABLE_PROXY_ARG = '-proxy=127.0.0.1:1'
|
|
|
|
# STATE_ESTABLISHED = '01'
|
|
# STATE_SYN_SENT = '02'
|
|
# STATE_SYN_RECV = '03'
|
|
# STATE_FIN_WAIT1 = '04'
|
|
# STATE_FIN_WAIT2 = '05'
|
|
# STATE_TIME_WAIT = '06'
|
|
# STATE_CLOSE = '07'
|
|
# STATE_CLOSE_WAIT = '08'
|
|
# STATE_LAST_ACK = '09'
|
|
STATE_LISTEN = '0A'
|
|
# STATE_CLOSING = '0B'
|
|
|
|
# Address manager size constants as defined in addrman_impl.h
|
|
ADDRMAN_NEW_BUCKET_COUNT = 1 << 10
|
|
ADDRMAN_TRIED_BUCKET_COUNT = 1 << 8
|
|
ADDRMAN_BUCKET_SIZE = 1 << 6
|
|
|
|
def get_socket_inodes(pid):
|
|
'''
|
|
Get list of socket inodes for process pid.
|
|
'''
|
|
base = '/proc/%i/fd' % pid
|
|
inodes = []
|
|
for item in os.listdir(base):
|
|
try:
|
|
target = os.readlink(os.path.join(base, item))
|
|
if target.startswith('socket:'):
|
|
inodes.append(int(target[8:-1]))
|
|
except FileNotFoundError:
|
|
pass
|
|
return inodes
|
|
|
|
def _remove_empty(array):
|
|
return [x for x in array if x !='']
|
|
|
|
def _convert_ip_port(array):
|
|
host,port = array.split(':')
|
|
# convert host from mangled-per-four-bytes form as used by kernel
|
|
host = bytes.fromhex(host)
|
|
host_out = ''
|
|
for x in range(0, len(host) // 4):
|
|
(val,) = struct.unpack('=I', host[x*4:(x+1)*4])
|
|
host_out += '%08x' % val
|
|
|
|
return host_out,int(port,16)
|
|
|
|
def netstat(typ='tcp'):
|
|
'''
|
|
Function to return a list with status of tcp connections at linux systems
|
|
To get pid of all network process running on system, you must run this script
|
|
as superuser
|
|
'''
|
|
with open('/proc/net/'+typ,'r') as f:
|
|
content = f.readlines()
|
|
content.pop(0)
|
|
result = []
|
|
for line in content:
|
|
line_array = _remove_empty(line.split(' ')) # Split lines and remove empty spaces.
|
|
tcp_id = line_array[0]
|
|
l_addr = _convert_ip_port(line_array[1])
|
|
r_addr = _convert_ip_port(line_array[2])
|
|
state = line_array[3]
|
|
inode = int(line_array[9]) # Need the inode to match with process pid.
|
|
nline = [tcp_id, l_addr, r_addr, state, inode]
|
|
result.append(nline)
|
|
return result
|
|
|
|
def get_bind_addrs(pid):
|
|
'''
|
|
Get bind addresses as (host,port) tuples for process pid.
|
|
'''
|
|
inodes = get_socket_inodes(pid)
|
|
bind_addrs = []
|
|
for conn in netstat('tcp') + netstat('tcp6'):
|
|
if conn[3] == STATE_LISTEN and conn[4] in inodes:
|
|
bind_addrs.append(conn[1])
|
|
return bind_addrs
|
|
|
|
# from: https://code.activestate.com/recipes/439093/
|
|
def all_interfaces():
|
|
'''
|
|
Return all interfaces that are up
|
|
'''
|
|
import fcntl # Linux only, so only import when required
|
|
|
|
is_64bits = sys.maxsize > 2**32
|
|
struct_size = 40 if is_64bits else 32
|
|
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
|
max_possible = 8 # initial value
|
|
while True:
|
|
bytes = max_possible * struct_size
|
|
names = array.array('B', b'\0' * bytes)
|
|
outbytes = struct.unpack('iL', fcntl.ioctl(
|
|
s.fileno(),
|
|
0x8912, # SIOCGIFCONF
|
|
struct.pack('iL', bytes, names.buffer_info()[0])
|
|
))[0]
|
|
if outbytes == bytes:
|
|
max_possible *= 2
|
|
else:
|
|
break
|
|
namestr = names.tobytes()
|
|
return [(namestr[i:i+16].split(b'\0', 1)[0],
|
|
socket.inet_ntoa(namestr[i+20:i+24]))
|
|
for i in range(0, outbytes, struct_size)]
|
|
|
|
def addr_to_hex(addr):
|
|
'''
|
|
Convert string IPv4 or IPv6 address to binary address as returned by
|
|
get_bind_addrs.
|
|
Very naive implementation that certainly doesn't work for all IPv6 variants.
|
|
'''
|
|
if '.' in addr: # IPv4
|
|
addr = [int(x) for x in addr.split('.')]
|
|
elif ':' in addr: # IPv6
|
|
sub = [[], []] # prefix, suffix
|
|
x = 0
|
|
addr = addr.split(':')
|
|
for i,comp in enumerate(addr):
|
|
if comp == '':
|
|
if i == 0 or i == (len(addr)-1): # skip empty component at beginning or end
|
|
continue
|
|
x += 1 # :: skips to suffix
|
|
assert x < 2
|
|
else: # two bytes per component
|
|
val = int(comp, 16)
|
|
sub[x].append(val >> 8)
|
|
sub[x].append(val & 0xff)
|
|
nullbytes = 16 - len(sub[0]) - len(sub[1])
|
|
assert (x == 0 and nullbytes == 0) or (x == 1 and nullbytes > 0)
|
|
addr = sub[0] + ([0] * nullbytes) + sub[1]
|
|
else:
|
|
raise ValueError('Could not parse address %s' % addr)
|
|
return bytearray(addr).hex()
|
|
|
|
def test_ipv6_local():
|
|
'''
|
|
Check for (local) IPv6 support.
|
|
'''
|
|
# By using SOCK_DGRAM this will not actually make a connection, but it will
|
|
# fail if there is no route to IPv6 localhost.
|
|
have_ipv6 = True
|
|
try:
|
|
s = socket.socket(socket.AF_INET6, socket.SOCK_DGRAM)
|
|
s.connect(('::1', 1))
|
|
except socket.error:
|
|
have_ipv6 = False
|
|
return have_ipv6
|
|
|
|
def test_unix_socket():
|
|
'''Return True if UNIX sockets are available on this platform.'''
|
|
try:
|
|
socket.AF_UNIX
|
|
except AttributeError:
|
|
return False
|
|
else:
|
|
return True
|
|
|
|
def format_addr_port(addr, port):
|
|
'''Return either "addr:port" or "[addr]:port" based on whether addr looks like an IPv6 address.'''
|
|
if ":" in addr:
|
|
return f"[{addr}]:{port}"
|
|
else:
|
|
return f"{addr}:{port}"
|