You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 

544 lines
20 KiB

import re
from typing import AnyStr, Iterable, Optional, Dict
from datetime import timedelta
from easysnmp import EasySNMPTimeoutError
from transliterate import translit
from django.utils.translation import gettext_lazy as _, gettext
from djing.lib import RuTimedelta, safe_int
from djing.lib.tln.tln import ValidationError as TlnValidationError, register_onu_ZTE_F660
from .base_intr import (
DevBase, SNMPBaseWorker, BasePort, DeviceImplementationError,
ListOrError, DeviceConfigurationError
)
def _norm_name(name: str, replreg=None):
if replreg is None:
return re.sub(pattern='\W{1,255}', repl='', string=name, flags=re.IGNORECASE)
return replreg.sub('', name)
def plain_ip_device_mon_template(device) -> Optional[AnyStr]:
if not device:
raise ValueError
parent_host_name = _norm_name("%d%s" % (
device.parent_dev.pk, translit(device.parent_dev.comment, language_code='ru', reversed=True)
)) if device.parent_dev else None
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
mac_addr = device.mac_addr
r = (
"define host{",
"\tuse generic-switch",
"\thost_name %s" % host_name,
"\taddress %s" % device.ip_address,
"\tparents %s" % parent_host_name if parent_host_name is not None else '',
"\t_mac_addr %s" % mac_addr if mac_addr is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
class DLinkPort(BasePort):
def __init__(self, num, name, status, mac, speed, snmp_worker):
BasePort.__init__(self, num, name, status, mac, speed)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
def disable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 2
)
def enable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 1
)
class DLinkDevice(DevBase, SNMPBaseWorker):
has_attachable_to_subscriber = True
tech_code = 'dlink_sw'
description = _('DLink switch')
is_use_device_port = True
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2)
def reboot(self):
return self.get_item('.1.3.6.1.4.1.2021.8.1.101.1')
def get_ports(self) -> ListOrError:
interfaces_count = safe_int(self.get_item('.1.3.6.1.2.1.2.1.0'))
nams = tuple(self.get_list('.1.3.6.1.4.1.171.10.134.2.1.1.100.2.1.3'))
stats = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.7'))
macs = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.6'))
speeds = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.5'))
res = []
try:
for n in range(interfaces_count):
status = True if int(stats[n]) == 1 else False
res.append(DLinkPort(
n + 1,
nams[n] if len(nams) > 0 else '',
status,
macs[n] if len(macs) > 0 else _('does not fetch the mac'),
int(speeds[n]) if len(speeds) > 0 else 0,
self))
return res
except IndexError:
return DeviceImplementationError('Dlink port index error'), res
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.1.0')
def uptime(self) -> timedelta:
uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.8.0'))
tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_template_name(self):
return 'generic_switch.html'
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# Dlink has no require snmp info
pass
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
return plain_ip_device_mon_template(device)
def register_device(self, extra_data: Dict):
pass
class ONUdev(BasePort):
def __init__(self, num, name, status, mac, speed, signal, snmp_worker):
super(ONUdev, self).__init__(num, name, status, mac, speed)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
self.signal = signal
def disable(self):
pass
def enable(self):
pass
def __str__(self):
return "%d: '%s' %s" % (self.num, self.nm, self.mac())
class OLTDevice(DevBase, SNMPBaseWorker):
has_attachable_to_subscriber = False
description = _('PON OLT')
is_use_device_port = False
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2)
def reboot(self):
pass
def get_ports(self) -> ListOrError:
nms = self.get_list('.1.3.6.1.4.1.3320.101.10.1.1.79')
res = []
try:
for nm in nms:
n = int(nm)
status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % n)
signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % n)
onu = ONUdev(
num=n,
name=self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % n),
status=True if status == '3' else False,
mac=self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % n),
speed=0,
signal=int(signal or 0),
snmp_worker=self)
res.append(onu)
except EasySNMPTimeoutError as e:
return EasySNMPTimeoutError(
"%s (%s)" % (gettext('wait for a reply from the SNMP Timeout'), e)
), res
return res
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def uptime(self):
up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.9.1.4.1'))
tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_template_name(self):
return 'olt.html'
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# Olt has no require snmp info
pass
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
return plain_ip_device_mon_template(device)
def register_device(self, extra_data: Dict):
pass
class OnuDevice(DevBase, SNMPBaseWorker):
has_attachable_to_subscriber = True
description = _('PON ONU')
tech_code = 'bdcom_onu'
is_use_device_port = False
def __init__(self, dev_instance):
DevBase.__init__(self, dev_instance)
dev_ip_addr = None
if dev_instance.ip_address:
dev_ip_addr = dev_instance.ip_address
else:
parent_device = dev_instance.parent_dev
if parent_device is not None and parent_device.ip_address:
dev_ip_addr = parent_device.ip_address
if dev_ip_addr is None:
raise DeviceImplementationError(gettext(
'Ip address or parent device with ip address required for ONU device'
))
SNMPBaseWorker.__init__(self, dev_ip_addr, dev_instance.man_passw, 2)
def reboot(self):
pass
def get_ports(self) -> ListOrError:
return ()
def get_device_name(self):
pass
def uptime(self):
pass
def get_template_name(self):
return "onu.html"
def get_details(self):
if self.db_instance is None:
return
num = safe_int(self.db_instance.snmp_extra)
if num == 0:
return
try:
status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % num)
signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % num)
distance = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.27.%d' % num)
mac = ':'.join('%x' % ord(i) for i in self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % num))
# uptime = self.get_item('.1.3.6.1.2.1.2.2.1.9.%d' % num)
signal = safe_int(signal)
if status.isdigit():
return {
'status': status,
'signal': signal / 10 if signal != 0 else 0,
'name': self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % num),
'mac': mac,
'distance': int(distance) / 10 if distance.isdigit() else 0
}
except EasySNMPTimeoutError as e:
return {'err': "%s: %s" % (_('ONU not connected'), e)}
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# DBCOM Onu have en integer snmp port
try:
int(v)
except ValueError:
raise TlnValidationError(_('Onu snmp field must be en integer'))
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
if device.ip_address:
address = device.ip_address
elif device.parent_dev:
address = device.parent_dev.ip_address
else:
address = None
r = (
"define host{",
"\tuse device-onu",
"\thost_name %s" % host_name,
"\taddress %s" % address if address else None,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
def register_device(self, extra_data: Dict):
pass
class EltexPort(BasePort):
def __init__(self, snmp_worker, *args, **kwargs):
BasePort.__init__(self, *args, **kwargs)
if not issubclass(snmp_worker.__class__, SNMPBaseWorker):
raise TypeError
self.snmp_worker = snmp_worker
def disable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num),
2
)
def enable(self):
self.snmp_worker.set_int_value(
"%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num),
1
)
class EltexSwitch(DLinkDevice):
description = _('Eltex switch')
is_use_device_port = False
has_attachable_to_subscriber = True
tech_code = 'eltex_sw'
def get_ports(self) -> ListOrError:
res = []
for i, n in enumerate(range(49, 77), 1):
speed = self.get_item('.1.3.6.1.2.1.2.2.1.5.%d' % n)
res.append(EltexPort(self,
i,
self.get_item('.1.3.6.1.2.1.31.1.1.1.18.%d' % n),
self.get_item('.1.3.6.1.2.1.2.2.1.8.%d' % n),
self.get_item('.1.3.6.1.2.1.2.2.1.6.%d' % n),
int(speed or 0),
))
return res
def get_device_name(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def uptime(self):
uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0'))
tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta())
return tm
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
return plain_ip_device_mon_template(device)
def conv_signal(lvl: int) -> float:
if lvl == 65535: return 0.0
r = 0
if 0 < lvl < 30000:
r = lvl * 0.002 - 30
elif 60000 < lvl < 65534:
r = (lvl - 65534) * 0.002 - 30
return round(r, 2)
class Olt_ZTE_C320(OLTDevice):
description = _('OLT ZTE C320')
def get_fibers(self):
fibers = ({
'fb_id': fiber_id,
'fb_name': fiber_name,
'fb_onu_num': safe_int(self.get_item('.1.3.6.1.4.1.3902.1012.3.13.1.1.13.%d' % int(fiber_id)))
} for fiber_name, fiber_id in self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.13.1.1.1'))
return fibers
def get_ports_on_fiber(self, fiber_num: int) -> Iterable:
onu_types = self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.28.1.1.1.%d' % fiber_num)
onu_ports = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.2.%d' % fiber_num)
onu_signals = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%d' % fiber_num)
# Real sn in last 3 octets
onu_sns = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.5.%d' % fiber_num)
onu_prefixs = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.11.2.1.1.%d' % fiber_num)
onu_list = ({
'onu_type': onu_type_num[0],
'onu_port': onu_port,
'onu_signal': conv_signal(safe_int(onu_signal)),
'onu_sn': onu_prefix + ''.join('%.2X' % ord(i) for i in onu_sn[-4:]), # Real sn in last 4 octets,
'snmp_extra': "%d.%d" % (fiber_num, safe_int(onu_type_num[1])),
} for onu_type_num, onu_port, onu_signal, onu_sn, onu_prefix in zip(
onu_types, onu_ports, onu_signals, onu_sns, onu_prefixs
))
return onu_list
def get_units_unregistered(self, fiber_num: int) -> Iterable:
sn_num_list = self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.13.3.1.2.%d' % fiber_num)
firmware_ver = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.11.%d' % fiber_num)
loid_passws = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.9.%d' % fiber_num)
loids = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.8.%d' % fiber_num)
return ({
'mac': ':'.join('%x' % ord(i) for i in sn[-6:]),
'firmware_ver': frm_ver,
'loid_passw': loid_passw,
'loid': loid,
'sn': sn
} for frm_ver, loid_passw, loid, (sn, num) in zip(
firmware_ver, loid_passws, loids, sn_num_list
))
def uptime(self):
up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0'))
tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta())
return tm
def get_long_description(self):
return self.get_item('.1.3.6.1.2.1.1.1.0')
def get_hostname(self):
return self.get_item('.1.3.6.1.2.1.1.5.0')
def get_template_name(self):
return 'olt_ztec320.html'
class ZteOnuDevice(OnuDevice):
description = _('ZTE PON ONU')
tech_code = 'zte_onu'
def get_details(self) -> Optional[Dict]:
if self.db_instance is None:
return
snmp_extra = self.db_instance.snmp_extra
if not snmp_extra:
return
try:
fiber_num, onu_num = snmp_extra.split('.')
fiber_num, onu_num = int(fiber_num), int(onu_num)
fiber_addr = '%d.%d' % (fiber_num, onu_num)
status = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.1.%s.1' % fiber_addr)
signal = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%s.1' % fiber_addr)
distance = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.18.%s.1' % fiber_addr)
ip_addr = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.16.1.1.10.%s' % fiber_addr)
vlans = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.15.100.1.1.7.%s.1.1' % fiber_addr)
int_name = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.3.%s' % fiber_addr)
onu_type = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.1.%s' % fiber_addr)
sn = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.5.%s' % fiber_addr)
if sn is not None:
sn = 'ZTEG%s' % ''.join('%.2X' % ord(x) for x in sn[-4:])
return {
'status': status,
'signal': conv_signal(safe_int(signal)),
'distance': safe_int(distance) / 10,
'ip_addr': ip_addr,
'vlans': vlans,
'serial': sn,
'int_name': int_name,
'onu_type': onu_type
}
except IndexError:
pass
def get_template_name(self):
return 'onu_for_zte.html'
@staticmethod
def validate_extra_snmp_info(v: str) -> None:
# for example 268501760.5
try:
fiber_num, onu_port = v.split('.')
int(fiber_num), int(onu_port)
except ValueError:
raise TlnValidationError(_('Zte onu snmp field must be two dot separated integers'))
def monitoring_template(self, *args, **kwargs) -> Optional[str]:
device = self.db_instance
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
if device.ip_address:
address = device.ip_address
elif device.parent_dev:
address = device.parent_dev.ip_address
else:
address = None
r = (
"define host{",
"\tuse dev-onu-zte-f660",
"\thost_name %s" % host_name,
"\taddress %s" % address if address else None,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
def register_device(self, extra_data: Dict):
if not extra_data:
raise DeviceConfigurationError(_('You have not info in extra_data field, please fill it in JSON'))
device = self.db_instance
ip = None
if device.ip_address:
ip = device.ip_address
elif device.parent_dev:
ip = device.parent_dev.ip_address
if ip:
mac = str(device.mac_addr).encode()
# Format serial number from mac address
# because saved mac address was make from serial number
sn = (b'%.2X' % int(x, base=16) for x in mac.split(b':')[-4:])
sn = b"ZTEG%s" % b''.join(sn)
telnet = extra_data.get('telnet')
if telnet is None:
raise DeviceConfigurationError('For ZTE configuration needed "telnet" section in extra_data')
login = telnet.get('login')
password = telnet.get('password')
prompt = telnet.get('prompt')
default_vid = extra_data.get('default_vid')
if login is None or password is None or prompt is None:
raise DeviceConfigurationError('For ZTE configuration needed login, password and'
' prompt for telnet access in extra_data')
if default_vid is None:
raise DeviceConfigurationError('Please specify default vlan id "default_vid" for configuration onu')
stack_num, rack_num, fiber_num, new_onu_port_num = register_onu_ZTE_F660(
olt_ip=ip, onu_sn=sn, login_passwd=(login.encode(), password.encode()),
onu_mac=mac, prompt_title=prompt.encode(), vlan_id=int(default_vid)
)
bin_snmp_fiber_number = "10000{0:08b}{1:08b}00000000".format(rack_num, fiber_num)
snmp_fiber_num = int(bin_snmp_fiber_number, base=2)
device.snmp_extra = "%d.%d" % (snmp_fiber_num, new_onu_port_num)
device.save(update_fields=('snmp_extra',))
def get_fiber_str(self):
dev = self.db_instance
if not dev:
return
dat = dev.snmp_extra
if dat and '.' in dat:
snmp_fiber_num, onu_port_num = dat.split('.')
snmp_fiber_num = int(snmp_fiber_num)
bin_snmp_fiber_num = bin(snmp_fiber_num)[2:]
rack_num = int(bin_snmp_fiber_num[5:13], 2)
fiber_num = int(bin_snmp_fiber_num[13:21], 2)
return 'gpon-onu_1/%d/%d:%s' % (
rack_num, fiber_num, onu_port_num
)