import re import re from typing import AnyStr, Iterable, Optional, Dict from datetime import timedelta from easysnmp import EasySNMPTimeoutError from pexpect import TIMEOUT from transliterate import translit from django.utils.translation import gettext_lazy as _, gettext from djing.lib import RuTimedelta, safe_int from devapp.onu_config import register_f601_onu, register_f660_onu, ExpectValidationError, OnuZteRegisterError from .base_intr import ( DevBase, SNMPBaseWorker, BasePort, DeviceImplementationError, ListOrError, DeviceConfigurationError ) def _norm_name(name: str, replreg=None): if replreg is None: return re.sub(pattern='\W{1,255}', repl='', string=name, flags=re.IGNORECASE) return replreg.sub('', name) def plain_ip_device_mon_template(device) -> Optional[AnyStr]: if not device: raise ValueError parent_host_name = _norm_name("%d%s" % ( device.parent_dev.pk, translit(device.parent_dev.comment, language_code='ru', reversed=True) )) if device.parent_dev else None host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True))) mac_addr = device.mac_addr r = ( "define host{", "\tuse generic-switch", "\thost_name %s" % host_name, "\taddress %s" % device.ip_address, "\tparents %s" % parent_host_name if parent_host_name is not None else '', "\t_mac_addr %s" % mac_addr if mac_addr is not None else '', "}\n" ) return '\n'.join(i for i in r if i) class DLinkPort(BasePort): def __init__(self, snmp_worker, *args, **kwargs): BasePort.__init__(self, writable=True, *args, **kwargs) if not issubclass(snmp_worker.__class__, SNMPBaseWorker): raise TypeError self.snmp_worker = snmp_worker def disable(self): self.snmp_worker.set_int_value( "%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 2 ) def enable(self): self.snmp_worker.set_int_value( "%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 1 ) class DLinkDevice(DevBase, SNMPBaseWorker): has_attachable_to_subscriber = True tech_code = 'dlink_sw' description = _('DLink switch') is_use_device_port = True def __init__(self, dev_instance): DevBase.__init__(self, dev_instance) SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2) def reboot(self): return self.get_item('.1.3.6.1.4.1.2021.8.1.101.1') def get_ports(self) -> ListOrError: interfaces_count = safe_int(self.get_item('.1.3.6.1.2.1.2.1.0')) nams = tuple(self.get_list('.1.3.6.1.4.1.171.10.134.2.1.1.100.2.1.3')) stats = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.7')) macs = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.6')) speeds = tuple(self.get_list('.1.3.6.1.2.1.2.2.1.5')) try: for n in range(interfaces_count): status = True if int(stats[n]) == 1 else False yield DLinkPort( num=n + 1, name=nams[n] if len(nams) > 0 else '', status=status, mac=macs[n] if len(macs) > 0 else _('does not fetch the mac'), speed=int(speeds[n]) if len(speeds) > 0 else 0, snmp_worker=self) except IndexError: return DeviceImplementationError('Dlink port index error') def get_device_name(self): return self.get_item('.1.3.6.1.2.1.1.1.0') def uptime(self) -> timedelta: uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.8.0')) tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta()) return tm def get_template_name(self): return 'generic_switch.html' @staticmethod def validate_extra_snmp_info(v: str) -> None: # Dlink has no require snmp info pass def monitoring_template(self, *args, **kwargs) -> Optional[str]: device = self.db_instance return plain_ip_device_mon_template(device) def register_device(self, extra_data: Dict): pass class ONUdev(BasePort): def __init__(self, signal, snmp_worker, *args, **kwargs): super(ONUdev, self).__init__(*args, **kwargs) if not issubclass(snmp_worker.__class__, SNMPBaseWorker): raise TypeError self.snmp_worker = snmp_worker self.signal = signal def disable(self): pass def enable(self): pass def __str__(self): return "%d: '%s' %s" % (self.num, self.nm, self.mac()) class OLTDevice(DevBase, SNMPBaseWorker): has_attachable_to_subscriber = False description = 'PON OLT' is_use_device_port = False def __init__(self, dev_instance): DevBase.__init__(self, dev_instance) SNMPBaseWorker.__init__(self, dev_instance.ip_address, dev_instance.man_passw, 2) def reboot(self): pass def get_ports(self) -> ListOrError: nms = self.get_list('.1.3.6.1.4.1.3320.101.10.1.1.79') res = [] try: for nm in nms: n = int(nm) status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % n) signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % n) onu = ONUdev( num=n, name=self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % n), status=True if status == '3' else False, mac=self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % n), speed=0, signal=int(signal or 0), snmp_worker=self) res.append(onu) except EasySNMPTimeoutError as e: return EasySNMPTimeoutError( "%s (%s)" % (gettext('wait for a reply from the SNMP Timeout'), e) ), res return res def get_device_name(self): return self.get_item('.1.3.6.1.2.1.1.5.0') def uptime(self): up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.9.1.4.1')) tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta()) return tm def get_template_name(self): return 'olt.html' @staticmethod def validate_extra_snmp_info(v: str) -> None: # Olt has no require snmp info pass def monitoring_template(self, *args, **kwargs) -> Optional[str]: device = self.db_instance return plain_ip_device_mon_template(device) def register_device(self, extra_data: Dict): pass class OnuDevice(DevBase, SNMPBaseWorker): has_attachable_to_subscriber = True description = 'PON ONU BDCOM' tech_code = 'bdcom_onu' is_use_device_port = False def __init__(self, dev_instance): DevBase.__init__(self, dev_instance) dev_ip_addr = None if dev_instance.ip_address: dev_ip_addr = dev_instance.ip_address else: parent_device = dev_instance.parent_dev if parent_device is not None and parent_device.ip_address: dev_ip_addr = parent_device.ip_address if dev_ip_addr is None: raise DeviceImplementationError(gettext( 'Ip address or parent device with ip address required for ONU device' )) SNMPBaseWorker.__init__(self, dev_ip_addr, dev_instance.man_passw, 2) def reboot(self): pass def get_ports(self) -> ListOrError: return () def get_device_name(self): pass def uptime(self): pass def get_template_name(self): return "onu.html" def get_details(self): if self.db_instance is None: return num = safe_int(self.db_instance.snmp_extra) if num == 0: return try: status = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.26.%d' % num) signal = self.get_item('.1.3.6.1.4.1.3320.101.10.5.1.5.%d' % num) distance = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.27.%d' % num) mac = self.get_item('.1.3.6.1.4.1.3320.101.10.1.1.3.%d' % num) if mac is not None: mac = ':'.join('%x' % ord(i) for i in mac) # uptime = self.get_item('.1.3.6.1.2.1.2.2.1.9.%d' % num) signal = safe_int(signal) if status is not None and status.isdigit(): return { 'status': status, 'signal': signal / 10 if signal != 0 else 0, 'name': self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % num), 'mac': mac, 'distance': int(distance) / 10 if distance.isdigit() else 0 } except EasySNMPTimeoutError as e: return {'err': "%s: %s" % (_('ONU not connected'), e)} @staticmethod def validate_extra_snmp_info(v: str) -> None: # DBCOM Onu have en integer snmp port try: int(v) except ValueError: raise ExpectValidationError(_('Onu snmp field must be en integer')) def monitoring_template(self, *args, **kwargs) -> Optional[str]: device = self.db_instance if not device: return host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True))) snmp_item = device.snmp_extra mac = device.mac_addr if device.ip_address: address = device.ip_address elif device.parent_dev: address = device.parent_dev.ip_address else: address = None r = ( "define host{", "\tuse device-onu", "\thost_name %s" % host_name, "\taddress %s" % address if address else None, "\t_snmp_item %s" % snmp_item if snmp_item is not None else '', "\t_mac_addr %s" % mac if mac is not None else '', "}\n" ) return '\n'.join(i for i in r if i) def register_device(self, extra_data: Dict): pass class EltexPort(BasePort): def __init__(self, snmp_worker, *args, **kwargs): BasePort.__init__(self, *args, **kwargs) if not issubclass(snmp_worker.__class__, SNMPBaseWorker): raise TypeError self.snmp_worker = snmp_worker def disable(self): self.snmp_worker.set_int_value( "%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 2 ) def enable(self): self.snmp_worker.set_int_value( "%s.%d" % ('.1.3.6.1.2.1.2.2.1.7', self.num), 1 ) class EltexSwitch(DLinkDevice): description = _('Eltex switch') is_use_device_port = False has_attachable_to_subscriber = True tech_code = 'eltex_sw' def get_ports(self) -> ListOrError: for i, n in enumerate(range(49, 77), 1): speed = self.get_item('.1.3.6.1.2.1.2.2.1.5.%d' % n) yield EltexPort(self, num=i, name=self.get_item('.1.3.6.1.2.1.31.1.1.1.18.%d' % n), status=self.get_item('.1.3.6.1.2.1.2.2.1.8.%d' % n), mac=self.get_item('.1.3.6.1.2.1.2.2.1.6.%d' % n), speed=int(speed or 0) ) def get_device_name(self): return self.get_item('.1.3.6.1.2.1.1.5.0') def uptime(self): uptimestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0')) tm = RuTimedelta(timedelta(seconds=uptimestamp / 100)) or RuTimedelta(timedelta()) return tm def monitoring_template(self, *args, **kwargs) -> Optional[str]: device = self.db_instance return plain_ip_device_mon_template(device) def conv_signal(lvl: int) -> float: if lvl == 65535: return 0.0 r = 0 if 0 < lvl < 30000: r = lvl * 0.002 - 30 elif 60000 < lvl < 65534: r = (lvl - 65534) * 0.002 - 30 return round(r, 2) class Olt_ZTE_C320(OLTDevice): description = 'OLT ZTE C320' def get_fibers(self): fibers = ({ 'fb_id': fiber_id, 'fb_name': fiber_name, 'fb_onu_num': safe_int(self.get_item('.1.3.6.1.4.1.3902.1012.3.13.1.1.13.%d' % int(fiber_id))) } for fiber_name, fiber_id in self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.13.1.1.1')) return fibers def get_ports_on_fiber(self, fiber_num: int) -> Iterable: onu_types = self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.28.1.1.1.%d' % fiber_num) onu_ports = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.2.%d' % fiber_num) onu_signals = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%d' % fiber_num) # Real sn in last 3 octets onu_sns = self.get_list('.1.3.6.1.4.1.3902.1012.3.28.1.1.5.%d' % fiber_num) onu_prefixs = self.get_list('.1.3.6.1.4.1.3902.1012.3.50.11.2.1.1.%d' % fiber_num) onu_list = ({ 'onu_type': onu_type_num[0], 'onu_port': onu_port, 'onu_signal': conv_signal(safe_int(onu_signal)), 'onu_sn': onu_prefix + ''.join('%.2X' % ord(i) for i in onu_sn[-4:]), # Real sn in last 4 octets, 'snmp_extra': "%d.%d" % (fiber_num, safe_int(onu_type_num[1])), } for onu_type_num, onu_port, onu_signal, onu_sn, onu_prefix in zip( onu_types, onu_ports, onu_signals, onu_sns, onu_prefixs )) return onu_list def get_units_unregistered(self, fiber_num: int) -> Iterable: sn_num_list = self.get_list_keyval('.1.3.6.1.4.1.3902.1012.3.13.3.1.2.%d' % fiber_num) firmware_ver = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.11.%d' % fiber_num) loid_passws = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.9.%d' % fiber_num) loids = self.get_list('.1.3.6.1.4.1.3902.1012.3.13.3.1.8.%d' % fiber_num) return ({ 'mac': ':'.join('%x' % ord(i) for i in sn[-6:]), 'firmware_ver': frm_ver, 'loid_passw': loid_passw, 'loid': loid, 'sn': sn } for frm_ver, loid_passw, loid, (sn, num) in zip( firmware_ver, loid_passws, loids, sn_num_list )) def uptime(self): up_timestamp = safe_int(self.get_item('.1.3.6.1.2.1.1.3.0')) tm = RuTimedelta(timedelta(seconds=up_timestamp / 100)) or RuTimedelta(timedelta()) return tm def get_long_description(self): return self.get_item('.1.3.6.1.2.1.1.1.0') def get_hostname(self): return self.get_item('.1.3.6.1.2.1.1.5.0') def get_template_name(self): return 'olt_ztec320.html' def _reg_dev_zte(device, extra_data: Dict, reg_func): if not extra_data: raise DeviceConfigurationError(_('You have not info in extra_data ' 'field, please fill it in JSON')) ip = None if device.ip_address: ip = device.ip_address elif device.parent_dev: ip = device.parent_dev.ip_address if ip: mac = str(device.mac_addr) if device.mac_addr else None # Format serial number from mac address # because saved mac address was make from serial number sn = "ZTEG%s" % ''.join('%.2X' % int(x, base=16) for x in mac.split(':')[-4:]) telnet = extra_data.get('telnet') try: onu_snmp = reg_func( onu_mac=mac, serial=sn, zte_ip_addr=str(ip), telnet_login=telnet.get('login'), telnet_passw=telnet.get('password'), telnet_prompt=telnet.get('prompt'), onu_vlan=extra_data.get('default_vid') ) if onu_snmp is not None: device.snmp_extra = onu_snmp device.save(update_fields=('snmp_extra',)) else: raise DeviceConfigurationError('unregistered onu not found, sn=%s' % sn) except TIMEOUT as e: raise OnuZteRegisterError(e) else: raise DeviceConfigurationError('not have ip') class ZteOnuDevice(OnuDevice): description = 'Zte ONU F660' tech_code = 'zte_onu' def get_details(self) -> Optional[Dict]: if self.db_instance is None: return snmp_extra = self.db_instance.snmp_extra if not snmp_extra: return try: fiber_num, onu_num = snmp_extra.split('.') fiber_num, onu_num = int(fiber_num), int(onu_num) fiber_addr = '%d.%d' % (fiber_num, onu_num) status = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.1.%s.1' % fiber_addr) signal = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.10.%s.1' % fiber_addr) distance = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.12.1.1.18.%s.1' % fiber_addr) ip_addr = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.16.1.1.10.%s' % fiber_addr) vlans = self.get_item('.1.3.6.1.4.1.3902.1012.3.50.15.100.1.1.7.%s.1.1' % fiber_addr) int_name = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.3.%s' % fiber_addr) onu_type = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.1.%s' % fiber_addr) sn = self.get_item('.1.3.6.1.4.1.3902.1012.3.28.1.1.5.%s' % fiber_addr) if sn is not None: sn = 'ZTEG%s' % ''.join('%.2X' % ord(x) for x in sn[-4:]) return { 'status': status, 'signal': conv_signal(safe_int(signal)), 'distance': safe_int(distance) / 10, 'ip_addr': ip_addr, 'vlans': vlans, 'serial': sn, 'int_name': int_name, 'onu_type': onu_type } except IndexError: pass def get_template_name(self): return 'onu_for_zte.html' @staticmethod def validate_extra_snmp_info(v: str) -> None: # for example 268501760.5 try: fiber_num, onu_port = v.split('.') int(fiber_num), int(onu_port) except ValueError: raise ExpectValidationError(_('Zte onu snmp field must be two dot separated integers')) def monitoring_template(self, *args, **kwargs) -> Optional[str]: device = self.db_instance if not device: return host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True))) snmp_item = device.snmp_extra mac = device.mac_addr if device.ip_address: address = device.ip_address elif device.parent_dev: address = device.parent_dev.ip_address else: address = None r = ( "define host{", "\tuse dev-onu-zte-f660", "\thost_name %s" % host_name, "\taddress %s" % address if address else None, "\t_snmp_item %s" % snmp_item if snmp_item is not None else '', "\t_mac_addr %s" % mac if mac is not None else '', "}\n" ) return '\n'.join(i for i in r if i) def register_device(self, extra_data: Dict): _reg_dev_zte(self.db_instance, extra_data, register_f660_onu) def get_fiber_str(self): dev = self.db_instance if not dev: return dat = dev.snmp_extra if dat and '.' in dat: snmp_fiber_num, onu_port_num = dat.split('.') snmp_fiber_num = int(snmp_fiber_num) bin_snmp_fiber_num = bin(snmp_fiber_num)[2:] rack_num = int(bin_snmp_fiber_num[5:13], 2) fiber_num = int(bin_snmp_fiber_num[13:21], 2) return 'gpon-onu_1/%d/%d:%s' % ( rack_num, fiber_num, onu_port_num ) class ZteF601(ZteOnuDevice): description = 'Zte ONU F601' def register_device(self, extra_data: Dict): _reg_dev_zte(self.db_instance, extra_data, register_f601_onu) class HuaweiSwitch(EltexSwitch): description = _('Huawei switch') is_use_device_port = True has_attachable_to_subscriber = True tech_code = 'huawei_s2300' def get_ports(self): interfaces_ids = self.get_list('.1.3.6.1.2.1.17.1.4.1.2') if interfaces_ids is None: raise DeviceImplementationError('Switch returned null') for i, n in enumerate(interfaces_ids): n = int(n) speed = self.get_item('.1.3.6.1.2.1.2.2.1.5.%d' % n) oper_status = safe_int(self.get_item('.1.3.6.1.2.1.2.2.1.7.%d' % n)) oper_status = True if oper_status == 1 else False link_status = safe_int(self.get_item('.1.3.6.1.2.1.2.2.1.8.%d' % n)) link_status = True if link_status == 1 else False ep = EltexPort( self, num=i+1, snmp_num=n, name=self.get_item('.1.3.6.1.2.1.2.2.1.2.%d' % n), # name status=oper_status, # status mac='', # self.get_item('.1.3.6.1.2.1.2.2.1.6.%d' % n), # mac speed=0 if not link_status else safe_int(speed) # speed ) ep.writable = True yield ep