From 614c7dbcb0b3286ba6bdb2d6d3ef3e3b8d58271b Mon Sep 17 00:00:00 2001 From: Dmitry Novikov Date: Thu, 21 Feb 2019 12:43:48 +0300 Subject: [PATCH] add support for ZTE-F601 registration --- devapp/base_intr.py | 2 +- devapp/dev_types.py | 96 +++++---- devapp/forms.py | 4 +- devapp/locale/ru/LC_MESSAGES/django.po | 16 -- devapp/models.py | 1 + devapp/onu_config/__init__.py | 6 + devapp/onu_config/base.py | 86 ++++++++ devapp/onu_config/f601.py | 165 +++++++++++++++ devapp/onu_config/f660.py | 141 +++++++++++++ devapp/views.py | 31 +-- djing/lib/tln/__init__.py | 4 - djing/lib/tln/tln.py | 274 ------------------------- docs/extra_func.md | 3 - requirements.txt | 5 +- 14 files changed, 478 insertions(+), 356 deletions(-) create mode 100644 devapp/onu_config/__init__.py create mode 100644 devapp/onu_config/base.py create mode 100755 devapp/onu_config/f601.py create mode 100644 devapp/onu_config/f660.py delete mode 100644 djing/lib/tln/__init__.py delete mode 100755 djing/lib/tln/tln.py diff --git a/devapp/base_intr.py b/devapp/base_intr.py index fd525b0..c867cfa 100644 --- a/devapp/base_intr.py +++ b/devapp/base_intr.py @@ -70,7 +70,7 @@ class DevBase(object, metaclass=ABCMeta): def validate_extra_snmp_info(v: str) -> None: """ Validate extra snmp field for each device. - If validation failed then raise en exception from djing.lib.tln.ValidationError + If validation failed then raise en exception from devapp.onu_config.ExpectValidationError with description of error. :param v: String value for validate """ diff --git a/devapp/dev_types.py b/devapp/dev_types.py index 952a662..9e50784 100644 --- a/devapp/dev_types.py +++ b/devapp/dev_types.py @@ -2,11 +2,12 @@ import re from typing import AnyStr, Iterable, Optional, Dict from datetime import timedelta from easysnmp import EasySNMPTimeoutError +from pexpect import TIMEOUT from transliterate import translit from django.utils.translation import gettext_lazy as _, gettext from djing.lib import RuTimedelta, safe_int -from djing.lib.tln.tln import ValidationError as TlnValidationError, register_onu_ZTE_F660 +from devapp.onu_config import register_f601_onu, register_f660_onu, ExpectValidationError, OnuZteRegisterError from .base_intr import ( DevBase, SNMPBaseWorker, BasePort, DeviceImplementationError, ListOrError, DeviceConfigurationError @@ -135,7 +136,7 @@ class ONUdev(BasePort): class OLTDevice(DevBase, SNMPBaseWorker): has_attachable_to_subscriber = False - description = _('PON OLT') + description = 'PON OLT' is_use_device_port = False def __init__(self, dev_instance): @@ -194,7 +195,7 @@ class OLTDevice(DevBase, SNMPBaseWorker): class OnuDevice(DevBase, SNMPBaseWorker): has_attachable_to_subscriber = True - description = _('PON ONU') + description = 'PON ONU BDCOM' tech_code = 'bdcom_onu' is_use_device_port = False @@ -260,7 +261,7 @@ class OnuDevice(DevBase, SNMPBaseWorker): try: int(v) except ValueError: - raise TlnValidationError(_('Onu snmp field must be en integer')) + raise ExpectValidationError(_('Onu snmp field must be en integer')) def monitoring_template(self, *args, **kwargs) -> Optional[str]: device = self.db_instance @@ -351,7 +352,7 @@ def conv_signal(lvl: int) -> float: class Olt_ZTE_C320(OLTDevice): - description = _('OLT ZTE C320') + description = 'OLT ZTE C320' def get_fibers(self): fibers = ({ @@ -413,8 +414,45 @@ class Olt_ZTE_C320(OLTDevice): return 'olt_ztec320.html' +def _reg_dev_zte(device, extra_data: Dict, reg_func): + if not extra_data: + raise DeviceConfigurationError(_('You have not info in extra_data ' + 'field, please fill it in JSON')) + ip = None + if device.ip_address: + ip = device.ip_address + elif device.parent_dev: + ip = device.parent_dev.ip_address + if ip: + mac = str(device.mac_addr) if device.mac_addr else None + + # Format serial number from mac address + # because saved mac address was make from serial number + sn = "ZTEG%s" % ''.join('%.2X' % int(x, base=16) for x in mac.split(':')[-4:]) + telnet = extra_data.get('telnet') + try: + onu_snmp = reg_func( + onu_mac=mac, + serial=sn, + zte_ip_addr=str(ip), + telnet_login=telnet.get('login'), + telnet_passw=telnet.get('password'), + telnet_prompt=telnet.get('prompt'), + onu_vlan=extra_data.get('default_vid') + ) + if onu_snmp is not None: + device.snmp_extra = onu_snmp + device.save(update_fields=('snmp_extra',)) + else: + raise DeviceConfigurationError('unregistered onu not found, sn=%s' % sn) + except TIMEOUT as e: + raise OnuZteRegisterError(e) + else: + raise DeviceConfigurationError('not have ip') + + class ZteOnuDevice(OnuDevice): - description = _('ZTE PON ONU') + description = 'Zte ONU F660' tech_code = 'zte_onu' def get_details(self) -> Optional[Dict]: @@ -462,7 +500,7 @@ class ZteOnuDevice(OnuDevice): fiber_num, onu_port = v.split('.') int(fiber_num), int(onu_port) except ValueError: - raise TlnValidationError(_('Zte onu snmp field must be two dot separated integers')) + raise ExpectValidationError(_('Zte onu snmp field must be two dot separated integers')) def monitoring_template(self, *args, **kwargs) -> Optional[str]: device = self.db_instance @@ -489,42 +527,7 @@ class ZteOnuDevice(OnuDevice): return '\n'.join(i for i in r if i) def register_device(self, extra_data: Dict): - if not extra_data: - raise DeviceConfigurationError(_('You have not info in extra_data field, please fill it in JSON')) - device = self.db_instance - ip = None - if device.ip_address: - ip = device.ip_address - elif device.parent_dev: - ip = device.parent_dev.ip_address - if ip: - mac = str(device.mac_addr).encode() - - # Format serial number from mac address - # because saved mac address was make from serial number - sn = (b'%.2X' % int(x, base=16) for x in mac.split(b':')[-4:]) - sn = b"ZTEG%s" % b''.join(sn) - - telnet = extra_data.get('telnet') - if telnet is None: - raise DeviceConfigurationError('For ZTE configuration needed "telnet" section in extra_data') - login = telnet.get('login') - password = telnet.get('password') - prompt = telnet.get('prompt') - default_vid = extra_data.get('default_vid') - if login is None or password is None or prompt is None: - raise DeviceConfigurationError('For ZTE configuration needed login, password and' - ' prompt for telnet access in extra_data') - if default_vid is None: - raise DeviceConfigurationError('Please specify default vlan id "default_vid" for configuration onu') - stack_num, rack_num, fiber_num, new_onu_port_num = register_onu_ZTE_F660( - olt_ip=ip, onu_sn=sn, login_passwd=(login.encode(), password.encode()), - onu_mac=mac, prompt_title=prompt.encode(), vlan_id=int(default_vid) - ) - bin_snmp_fiber_number = "10000{0:08b}{1:08b}00000000".format(rack_num, fiber_num) - snmp_fiber_num = int(bin_snmp_fiber_number, base=2) - device.snmp_extra = "%d.%d" % (snmp_fiber_num, new_onu_port_num) - device.save(update_fields=('snmp_extra',)) + _reg_dev_zte(self.db_instance, extra_data, register_f660_onu) def get_fiber_str(self): dev = self.db_instance @@ -542,6 +545,13 @@ class ZteOnuDevice(OnuDevice): ) +class ZteF601(ZteOnuDevice): + description = 'Zte ONU F601' + + def register_device(self, extra_data: Dict): + _reg_dev_zte(self.db_instance, extra_data, register_f601_onu) + + class HuaweiSwitch(EltexSwitch): description = _('Huawei switch') is_use_device_port = True diff --git a/devapp/forms.py b/devapp/forms.py index e82cff3..8f37db2 100644 --- a/devapp/forms.py +++ b/devapp/forms.py @@ -4,7 +4,7 @@ from django.utils.translation import gettext_lazy as _ from django.db import IntegrityError from djing.lib import DuplicateEntry -from djing.lib.tln.tln import ValidationError as TlnValidationError +from devapp.onu_config import ExpectValidationError from . import models from djing import MAC_ADDR_REGEX, IP_ADDR_REGEX @@ -43,7 +43,7 @@ class DeviceForm(forms.ModelForm): manager_class = device.get_manager_klass() try: manager_class.validate_extra_snmp_info(snmp_extra) - except TlnValidationError as e: + except ExpectValidationError as e: raise ValidationError( e, code='invalid' ) diff --git a/devapp/locale/ru/LC_MESSAGES/django.po b/devapp/locale/ru/LC_MESSAGES/django.po index 513ca3d..69b0e22 100644 --- a/devapp/locale/ru/LC_MESSAGES/django.po +++ b/devapp/locale/ru/LC_MESSAGES/django.po @@ -30,18 +30,10 @@ msgstr "Свич D'Link" msgid "does not fetch the mac" msgstr "не нашёл мак" -#: dev_types.py:140 -msgid "PON OLT" -msgstr "" - #: dev_types.py:169 views.py:345 views.py:503 msgid "wait for a reply from the SNMP Timeout" msgstr "Время ожидания ответа от SNMP истекло" -#: dev_types.py:199 -msgid "PON ONU" -msgstr "" - #: dev_types.py:214 msgid "Ip address or parent device with ip address required for ONU device" msgstr "" @@ -60,14 +52,6 @@ msgstr "Поле для snmp информации об ONU должно быть msgid "Eltex switch" msgstr "Элтекс свич" -#: dev_types.py:356 -msgid "OLT ZTE C320" -msgstr "" - -#: dev_types.py:419 -msgid "ZTE PON ONU" -msgstr "" - #: dev_types.py:454 msgid "Zte onu snmp field must be two dot separated integers" msgstr "" diff --git a/devapp/models.py b/devapp/models.py index c47ee6b..20c79a9 100644 --- a/devapp/models.py +++ b/devapp/models.py @@ -33,6 +33,7 @@ class Device(models.Model): ('Ex', dev_types.EltexSwitch), ('Zt', dev_types.Olt_ZTE_C320), ('Zo', dev_types.ZteOnuDevice), + ('Z6', dev_types.ZteF601), ('Hw', dev_types.HuaweiSwitch) ) devtype = models.CharField(_('Device type'), max_length=2, default=DEVICE_TYPES[0][0], diff --git a/devapp/onu_config/__init__.py b/devapp/onu_config/__init__.py new file mode 100644 index 0000000..7bc1fef --- /dev/null +++ b/devapp/onu_config/__init__.py @@ -0,0 +1,6 @@ +from .f601 import register_onu as register_f601_onu +from .f660 import register_onu as register_f660_onu +from .base import ( + ZteOltConsoleError, OnuZteRegisterError, + ZTEFiberIsFull, ZteOltLoginFailed, ExpectValidationError +) diff --git a/devapp/onu_config/base.py b/devapp/onu_config/base.py new file mode 100644 index 0000000..20c1fb9 --- /dev/null +++ b/devapp/onu_config/base.py @@ -0,0 +1,86 @@ +import re +import sys +from pexpect import spawn + + +class ZteOltConsoleError(Exception): + pass + + +class OnuZteRegisterError(ZteOltConsoleError): + pass + + +class ZTEFiberIsFull(ZteOltConsoleError): + pass + + +class ZteOltLoginFailed(ZteOltConsoleError): + pass + + +class ExpectValidationError(ValueError): + pass + + +class MySpawn(spawn): + def __init__(self, *args, **kwargs): + super(MySpawn, self).__init__(encoding='utf-8', *args, **kwargs) + self.logfile = sys.stdout + + def do_cmd(self, c, prompt): + self.sendline(c) + return self.expect_exact(prompt) + + def get_lines(self): + return self.buffer.split('\r\n') + + def get_lines_before(self): + return self.before.split('\r\n') + + +def parse_onu_name(onu_name: str, name_regexp=re.compile('[/:_]')): + gpon_onu, stack_num, rack_num, fiber_num, onu_num = name_regexp.split(onu_name) + return { + 'stack_num': stack_num, + 'rack_num': rack_num, + 'fiber_num': fiber_num, + 'onu_num': onu_num + } + + +def get_unregistered_onu(lines, serial): + for line in lines: + if line.startswith('gpon-onu_'): + spls = re.split(r'\s+', line) + if len(spls) > 2: + if serial == spls[1]: + onu_index, sn, state = spls[:3] + return parse_onu_name(onu_index) + + +def get_free_registered_onu_number(lines): + onu_type_regexp = re.compile(r'^\s{1,5}onu \d{1,3} type [-\w\d]{4,64} sn \w{4,64}$') + onu_olt_num = None + i = 0 + for l in lines: + if onu_type_regexp.match(l): + # match line + i += 1 + onu, num, onu_type, onu_type, sn, onu_sn = l.split() + onu_olt_num = int(num) + if onu_olt_num > i: + return i + return onu_olt_num + 1 + + +def sn_to_mac(sn: str): + t = sn[4:].lower() + r = tuple(t[i:i + 2] for i in range(0, len(t), 2)) + return '45:47:%s' % ':'.join(r) + + +def onu_conv(rack_num: int, fiber_num: int, port_num: int): + r = "10000{0:08b}{1:08b}00000000".format(rack_num, fiber_num) + snmp_fiber_num = int(r, base=2) + return "%d.%d" % (snmp_fiber_num, port_num) diff --git a/devapp/onu_config/f601.py b/devapp/onu_config/f601.py new file mode 100755 index 0000000..d1ebba6 --- /dev/null +++ b/devapp/onu_config/f601.py @@ -0,0 +1,165 @@ +import re +from typing import Optional +from djing.lib import process_lock +from . import base + + +def get_onu_template(vlan_id: int, mac_addr: str): + template = ( + 'sn-bind enable sn', + 'tcont 1 profile HSI_100', + 'gemport 1 unicast tcont 1 dir both', + 'switchport mode hybrid vport 1', + 'switchport vlan %d tag vport 1' % vlan_id, + 'port-location format flexible-syntax vport 1', + 'port-location sub-option remote-id enable vport 1', + 'port-location sub-option remote-id name %s vport 1' % mac_addr, + 'dhcp-option82 enable vport 1', + 'dhcp-option82 trust true replace vport 1', + 'ip dhcp snooping enable vport 1' + ) + return template + + +def get_pon_mng_template(vlan_id: int): + template = ( + 'service HSI type internet gemport 1 vlan %d' % vlan_id, + 'loop-detect ethuni eth_0/1 enable', + 'vlan port eth_0/1 mode tag vlan %d' % vlan_id, + 'dhcp-ip ethuni eth_0/1 from-internet' + ) + return template + + +def appy_config(onu_mac: str, sn: str, hostname: str, login: str, password: str, prompt: str, vlan: int): + onu_type = 'ZTE-F601' + + # Входим + ch = base.MySpawn('telnet %s' % hostname) + ch.timeout = 15 + ch.expect_exact('Username:') + ch.do_cmd(login, 'Password:') + + choice = ch.do_cmd(password, ['bad password.', '%s#' % prompt]) + if choice == 0: + raise base.ZteOltLoginFailed + + ch.do_cmd('terminal length 0', '%s#' % prompt) + choice = ch.do_cmd('show gpon onu uncfg', ['No related information to show', '%s#' % prompt]) + + if choice == 0: + ch.close() + raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) + elif choice == 1: + # Получим незареганные onu + unregistered_onu = base.get_unregistered_onu( + lines=ch.get_lines_before(), + serial=sn + ) + if unregistered_onu is None: + ch.close() + raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) + + stack_num = int(unregistered_onu.get('stack_num')) + rack_num = int(unregistered_onu.get('rack_num')) + fiber_num = int(unregistered_onu.get('fiber_num')) + + # Получим последнюю зарегистрированную onu + ch.do_cmd('show run int gpon-olt_%(stack)s/%(rack)s/%(fiber)s' % { + 'stack': stack_num, + 'rack': rack_num, + 'fiber': fiber_num + }, '%s#' % prompt) + free_onu_number = base.get_free_registered_onu_number( + ch.get_lines_before() + ) + + if free_onu_number > 126: + ch.close() + raise base.ZTEFiberIsFull('olt fiber %d is full' % fiber_num) + + # enter to config + ch.do_cmd('conf t', '%s(config)#' % prompt) + + int_addr = '%d/%d/%d' % ( + stack_num, + rack_num, + fiber_num + ) + + # go to olt interface + ch.do_cmd('interface gpon-olt_%s' % int_addr, '%s(config-if)#' % prompt) + + # register onu on olt interface + ch.do_cmd('onu %d type %s sn %s' % ( + free_onu_number, + onu_type, + sn + ), '%s(config-if)#' % prompt) + + # Exit from int olt + ch.do_cmd('exit', '%s(config)#' % prompt) + + # Enter to int onu + ch.do_cmd('int gpon-onu_%(int_addr)s:%(onu_num)d' % { + 'int_addr': int_addr, + 'onu_num': free_onu_number + }, '%s(config-if)#' % prompt) + + # Apply int onu config + template = get_onu_template(vlan, onu_mac) + for line in template: + ch.do_cmd(line, '%s(config-if)#' % prompt) + + # Exit + ch.do_cmd('exit', '%s(config)#' % prompt) + + # Enter to pon-onu-mng + ch.do_cmd('pon-onu-mng gpon-onu_%(int_addr)s:%(onu_num)d' % { + 'int_addr': int_addr, + 'onu_num': free_onu_number + }, '%s(gpon-onu-mng)#' % prompt) + + # Apply config to pon-onu-mng + for line in get_pon_mng_template(vlan): + ch.do_cmd(line, '%s(gpon-onu-mng)#' % prompt) + + # Exit + ch.do_cmd('exit', '%s(config)#' % prompt) + + ch.close() + return base.onu_conv( + rack_num=rack_num, + fiber_num=fiber_num, + port_num=free_onu_number + ) + else: + ch.close() + raise base.ZteOltConsoleError("I don't know what choice:", choice) + + +# Main Entry point +@process_lock +def register_onu(onu_mac: Optional[str], serial: str, zte_ip_addr: str, telnet_login: str, + telnet_passw: str, telnet_prompt: str, onu_vlan: int): + + if not re.match(r'^ZTEG[0-9A-F]{8}$', serial): + raise base.ExpectValidationError('Serial not valid, match: ^ZTEG[0-9A-F]{8}$') + + if not isinstance(onu_vlan, int): + onu_vlan = int(onu_vlan) + + if onu_mac is None: + onu_mac = base.sn_to_mac(serial) + + IP4_ADDR_REGEX = ( + r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' + ) + if not re.match(IP4_ADDR_REGEX, zte_ip_addr): + raise base.ExpectValidationError('ip address for zte not valid') + + return appy_config(onu_mac, serial, zte_ip_addr, telnet_login, + telnet_passw, telnet_prompt, onu_vlan) diff --git a/devapp/onu_config/f660.py b/devapp/onu_config/f660.py new file mode 100644 index 0000000..a7086e0 --- /dev/null +++ b/devapp/onu_config/f660.py @@ -0,0 +1,141 @@ +import re +from typing import Optional + +from djing.lib import process_lock +from . import base + + +def get_onu_template(vlan_id: int, mac_addr: str): + template = ( + 'switchport mode hybrid vport 1', + 'switchport vlan %d tag vport 1' % vlan_id, + 'port-location format flexible-syntax vport 1', + 'port-location sub-option remote-id enable vport 1', + 'port-location sub-option remote-id name %s vport 1' % mac_addr, + 'dhcp-option82 enable vport 1', + 'dhcp-option82 trust true replace vport 1', + 'ip dhcp snooping enable vport 1' + ) + return template + + +def appy_config(onu_mac: str, sn: str, hostname: str, login: str, password: str, prompt: str, vlan: int): + onu_type = 'ZTE-F660' + + # Входим + ch = base.MySpawn('telnet %s' % hostname) + ch.timeout = 15 + ch.expect_exact('Username:') + ch.do_cmd(login, 'Password:') + + choice = ch.do_cmd(password, ['bad password.', '%s#' % prompt]) + if choice == 0: + raise base.ZteOltLoginFailed + + ch.do_cmd('terminal length 0', '%s#' % prompt) + choice = ch.do_cmd('show gpon onu uncfg', ['No related information to show', '%s#' % prompt]) + if choice == 0: + ch.close() + raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) + elif choice == 1: + # Получим незареганные onu + unregistered_onu = base.get_unregistered_onu( + lines=ch.get_lines_before(), + serial=sn + ) + if unregistered_onu is None: + ch.close() + raise base.OnuZteRegisterError('unregistered onu not found, sn=%s' % sn) + stack_num = int(unregistered_onu.get('stack_num')) + rack_num = int(unregistered_onu.get('rack_num')) + fiber_num = int(unregistered_onu.get('fiber_num')) + + # Получим последнюю зарегистрированную onu + ch.do_cmd('show run int gpon-olt_%(stack)s/%(rack)s/%(fiber)s' % { + 'stack': stack_num, + 'rack': rack_num, + 'fiber': fiber_num + }, '%s#' % prompt) + free_onu_number = base.get_free_registered_onu_number( + ch.get_lines_before() + ) + if free_onu_number > 126: + ch.close() + raise base.ZTEFiberIsFull('olt fiber %d is full' % fiber_num) + + # enter to config + ch.do_cmd('conf t', '%s(config)#' % prompt) + int_addr = '%d/%d/%d' % ( + stack_num, + rack_num, + fiber_num + ) + + # go to olt interface + ch.do_cmd('interface gpon-olt_%s' % int_addr, '%s(config-if)#' % prompt) + + # register onu on olt interface + ch.do_cmd('onu %d type %s sn %s' % ( + free_onu_number, + onu_type, + sn + ), '%s(config-if)#' % prompt) + # register onu profile on olt interface + ch.do_cmd( + 'onu %d profile line ZTE-F660-LINE remote ZTE-F660-ROUTER' % free_onu_number, + '%s(config-if)#' % prompt + ) + + # Exit from int olt + ch.do_cmd('exit', '%s(config)#' % prompt) + + # Enter to int onu + ch.do_cmd('int gpon-onu_%(int_addr)s:%(onu_num)d' % { + 'int_addr': int_addr, + 'onu_num': free_onu_number + }, '%s(config-if)#' % prompt) + + # Apply int onu config + template = get_onu_template(vlan, onu_mac) + for line in template: + ch.do_cmd(line, '%s(config-if)#' % prompt) + + # Exit + ch.do_cmd('exit', '%s(config)#' % prompt) + ch.do_cmd('exit', '%s#' % prompt) + ch.close() + return base.onu_conv( + rack_num=rack_num, + fiber_num=fiber_num, + port_num=free_onu_number + ) + else: + ch.close() + raise base.ZteOltConsoleError("I don't know what choice:", choice) + + +# Main Entry point +@process_lock +def register_onu(onu_mac: Optional[str], serial: str, zte_ip_addr: str, telnet_login: str, + telnet_passw: str, telnet_prompt: str, onu_vlan: int): + + if not re.match(r'^ZTEG[0-9A-F]{8}$', serial): + raise base.ExpectValidationError('Serial not valid, match: ^ZTEG[0-9A-F]{8}$') + + if not isinstance(onu_vlan, int): + onu_vlan = int(onu_vlan) + + if onu_mac is None: + onu_mac = base.sn_to_mac(serial) + + IP4_ADDR_REGEX = ( + r'^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' + r'(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' + ) + if not re.match(IP4_ADDR_REGEX, zte_ip_addr): + raise base.ExpectValidationError('ip address for zte not valid') + + return appy_config(onu_mac, serial, zte_ip_addr, telnet_login, + telnet_passw, telnet_prompt, onu_vlan) diff --git a/devapp/views.py b/devapp/views.py index 0d2c845..8678f01 100644 --- a/devapp/views.py +++ b/devapp/views.py @@ -21,17 +21,16 @@ from djing.lib import safe_int, ProcessLocked, DuplicateEntry from djing.lib.decorators import json_view from djing.lib.decorators import only_admins, hash_auth_view from djing.lib.mixins import LoginAdminPermissionMixin, LoginAdminMixin -from djing.lib.tln import ZteOltConsoleError, OnuZteRegisterError, \ - ZteOltLoginFailed from djing.tasks import multicast_email_notify from easysnmp import EasySNMPTimeoutError, EasySNMPError from group_app.models import Group from messenger.tasks import multicast_viber_notify from guardian.decorators import permission_required_or_403 as permission_required from guardian.shortcuts import get_objects_for_user -from .forms import DeviceForm, PortForm, DeviceExtraDataForm -from .models import Device, Port, DeviceDBException, DeviceMonitoringException -from .tasks import onu_register +from devapp.forms import DeviceForm, PortForm, DeviceExtraDataForm +from devapp.models import Device, Port, DeviceDBException, DeviceMonitoringException +from devapp.tasks import onu_register +from devapp import onu_config class DevicesListView(LoginAdminPermissionMixin, @@ -784,17 +783,25 @@ def register_device(request, group_id: int, device_id: int): try: device.register_device() status = 0 - except OnuZteRegisterError: + except onu_config.OnuZteRegisterError: text = format_msg(gettext('Unregistered onu not found'), 'eye-close') - except ZteOltLoginFailed: - text = format_msg(gettext('Wrong login or password for telnet access'), - 'lock') - except (ConnectionRefusedError, ZteOltConsoleError) as e: + except onu_config.ZteOltLoginFailed: + text = format_msg( + gettext('Wrong login or password for telnet access'), + 'lock' + ) + except ( + ConnectionRefusedError, onu_config.ZteOltConsoleError, + onu_config.ExpectValidationError, onu_config.ZTEFiberIsFull + ) as e: text = format_msg(e, 'exclamation-sign') except DeviceImplementationError as e: - text = format_msg(e, 'wrench') + text = format_msg(str(e), 'wrench') except ProcessLocked: - text = format_msg(gettext('Process locked by another process'), 'time') + text = format_msg( + gettext('Process locked by another process'), + 'time' + ) else: text = format_msg(msg='ok', icon='ok') return { diff --git a/djing/lib/tln/__init__.py b/djing/lib/tln/__init__.py deleted file mode 100644 index afd3a90..0000000 --- a/djing/lib/tln/__init__.py +++ /dev/null @@ -1,4 +0,0 @@ -from .tln import * - -__all__ = ('TelnetApi', 'ValidationError', 'ZTEFiberIsFull', 'ZteOltLoginFailed', - 'OnuZteRegisterError', 'ZteOltConsoleError', 'register_onu_ZTE_F660') diff --git a/djing/lib/tln/tln.py b/djing/lib/tln/tln.py deleted file mode 100755 index 5d4d347..0000000 --- a/djing/lib/tln/tln.py +++ /dev/null @@ -1,274 +0,0 @@ -#!/usr/bin/env python3 -import re -import struct -from telnetlib import Telnet -from time import sleep -from typing import Generator, Dict, Optional, Tuple - -from djing.lib import process_lock - - -class ZteOltConsoleError(Exception): - pass - - -class OnuZteRegisterError(ZteOltConsoleError): - pass - - -class ZTEFiberIsFull(ZteOltConsoleError): - pass - - -class ZteOltLoginFailed(ZteOltConsoleError): - pass - - -class ValidationError(ValueError): - pass - - -MAC_ADDR_REGEX = b'^([0-9A-Fa-f]{1,2}[:-]){5}([0-9A-Fa-f]{1,2})$' -IP_ADDR_REGEX = ( - '^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' - '(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' - '(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.' - '(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)$' -) -ONU_SN_REGEX = b'^ZTEG[A-F\d]{8}$' - - -class TelnetApi(Telnet): - - def __init__(self, prompt_string: bytes, *args, **kwargs): - timeout = kwargs.get('timeout') - if timeout: - self._timeout = timeout - self._prompt_string = prompt_string or b'ZTE#' - self.config_level = [] - super().__init__(*args, **kwargs) - - def write(self, buffer: bytes) -> None: - buffer = buffer + b'\n' - print('>>', buffer) - super().write(buffer) - - def resize_screen(self, width: int, height: int): - naws_cmd = struct.pack('>BBBHHBB', - 255, 250, 31, # IAC SB NAWS - width, height, - 255, 240 # IAC SE - ) - sock = self.get_socket() - sock.send(naws_cmd) - - def read_lines(self) -> Generator: - while True: - line = self.read_until(b'\r\n', timeout=self._timeout) - line = line.replace(b'\r\n', b'') - if self._prompt_string == line: - break - if line == b'': - continue - yield line - - def command_to(self, cmd: bytes) -> Generator: - self.write(cmd) - return self.read_lines() - - def set_prompt_string(self, prompt_string: bytes) -> None: - self.config_level.append(prompt_string) - self._prompt_string = prompt_string - - def level_exit(self) -> Optional[Tuple]: - if len(self.config_level) < 2: - print('We are in root') - return - self.config_level.pop() - self.set_prompt_string(self.config_level[-1]) - return tuple(self.command_to(b'exit')) - - def __del__(self): - if self.sock: - self.write(b'exit') - super().__del__() - - -def parse_onu_name(onu_name: bytes, name_regexp=re.compile(b'[/:_]')) -> Dict[str, bytes]: - gpon_onu, stack_num, rack_num, fiber_num, onu_num = name_regexp.split(onu_name) - return { - 'stack_num': stack_num, - 'rack_num': rack_num, - 'fiber_num': fiber_num, - 'onu_num': onu_num - } - - -class OltZTERegister(TelnetApi): - - def __init__(self, screen_size: Tuple[int, int], prompt_title: bytes, *args, **kwargs): - super().__init__(prompt_string=prompt_title, *args, **kwargs) - self.prompt_title = prompt_title - self.set_prompt_string(b'%s#' % prompt_title) - self.resize_screen(*screen_size) - - def enter(self, username: bytes, passw: bytes) -> None: - self.read_until(b'Username:') - self.write(username) - self.read_until(b'Password:') - self.write(passw) - for l in self.read_lines(): - if b'bad password' in l: - raise ZteOltLoginFailed - - def get_unregistered_onu(self, sn: bytes) -> Optional[Dict]: - lines = tuple(self.command_to(b'show gpon onu uncfg')) - if len(lines) > 3: - # devices available - # find onu by sn - line = tuple(ln for ln in lines if sn.lower() in ln.lower()) - if len(line) > 0: - line = line[0] - onu_name, onu_sn, onu_state = line.split() - onu_numbers = parse_onu_name(onu_name) - onu_numbers.update({ - 'onu_name': onu_name, - 'onu_sn': onu_sn, - 'onu_state': onu_state - }) - return onu_numbers - - def get_last_registered_onu_number(self, stack_num: int, rack_num: int, fiber_num: int) -> int: - registered_lines = self.command_to(b'show run int gpon-olt_%d/%d/%d' % ( - stack_num, - rack_num, - fiber_num - )) - onu_type_regexp = re.compile(b'^\s{2}onu \d{1,3} type [-\w\d]{4,64} sn \w{4,64}$') - last_onu = 0 - for rl in registered_lines: - if rl == b' --More--': - self.write(b' ') - if onu_type_regexp.match(rl): - _onu, num, _type, onu_type, _sn, onu_sn = rl.split() - last_onu = int(num) - return last_onu - - def enter_to_config_mode(self) -> bool: - prompt = b'%s(config)#' % self.prompt_title - self.set_prompt_string(prompt) - res = tuple(self.command_to(b'config terminal')) - if res[1].startswith(b'Enter configuration commands'): - # ok, we in the config mode - return True - return False - - def go_to_olt_interface(self, stack_num: int, rack_num: int, fiber_num: int) -> Tuple: - self.set_prompt_string(b'%s(config-if)#' % self.prompt_title) - return tuple(self.command_to(b'interface gpon-olt_%d/%d/%d' % ( - stack_num, - rack_num, - fiber_num - ))) - - def go_to_onu_interface(self, stack_num: int, rack_num: int, fiber_num: int, onu_port_num: int) -> Tuple: - self.set_prompt_string(b'%s(config-if)#' % self.prompt_title) - return tuple(self.command_to(b'interface gpon-onu_%d/%d/%d:%d' % ( - stack_num, - rack_num, - fiber_num, - onu_port_num - ))) - - def apply_conf_to_onu(self, mac_addr: bytes, vlan_id: int) -> None: - tmpl = ( - b'switchport vlan %d tag vport 1' % vlan_id, - b'port-location format flexible-syntax vport 1', - b'port-location sub-option remote-id enable vport 1', - b'port-location sub-option remote-id name %s vport 1' % mac_addr, - b'dhcp-option82 enable vport 1', - b'dhcp-option82 trust true replace vport 1', - b'ip dhcp snooping enable vport 1' - ) - for conf_line in tmpl: - self.write(conf_line) - - def register_onu_on_olt_fiber(self, onu_type: bytes, new_onu_num: int, onu_sn: bytes, line_profile: bytes, - remote_profile: bytes) -> Tuple: - # ok, we in interface - tpl = b'onu %d type %s sn %s' % (new_onu_num, onu_type, onu_sn) - r = tuple(self.command_to(tpl)) - return tuple(self.command_to(b'onu %d profile line %s remote %s' % ( - new_onu_num, - line_profile, - remote_profile - ))) + r - - -@process_lock -def register_onu_ZTE_F660(olt_ip: str, onu_sn: bytes, login_passwd: Tuple[bytes, bytes], onu_mac: bytes, prompt_title: bytes, vlan_id: int) -> Tuple: - onu_type = b'ZTE-F660' - line_profile = b'ZTE-F660-LINE' - remote_profile = b'ZTE-F660-ROUTER' - if not re.match(MAC_ADDR_REGEX, onu_mac): - raise ValidationError - if not re.match(IP_ADDR_REGEX, olt_ip): - raise ValidationError - if not re.match(ONU_SN_REGEX, onu_sn): - raise ValidationError - - tn = OltZTERegister(host=olt_ip, timeout=2, screen_size=(120, 128), prompt_title=prompt_title) - tn.enter(*login_passwd) - - unregistered_onu = tn.get_unregistered_onu(onu_sn) - if unregistered_onu is None: - raise OnuZteRegisterError('unregistered onu not found, sn=%s' % onu_sn.decode('utf-8')) - - stack_num = int(unregistered_onu['stack_num']) - rack_num = int(unregistered_onu['rack_num']) - fiber_num = int(unregistered_onu['fiber_num']) - - last_onu_number = tn.get_last_registered_onu_number( - stack_num, rack_num, fiber_num - ) - - if last_onu_number > 126: - raise ZTEFiberIsFull('olt fiber %d is full' % fiber_num) - - # enter to config - if not tn.enter_to_config_mode(): - raise ZteOltConsoleError('Failed to enter to config mode') - - # go to olt interface - if not tn.go_to_olt_interface(stack_num, rack_num, fiber_num): - raise ZteOltConsoleError('Failed to enter in olt fiber port') - - # new onu port number - new_onu_port_num = last_onu_number + 1 - - # register onu on olt interface - r = tn.register_onu_on_olt_fiber(onu_type, new_onu_port_num, onu_sn, line_profile, remote_profile) - print(r) - - # exit from olt interface - tn.level_exit() - - r = tn.go_to_onu_interface(stack_num, rack_num, fiber_num, new_onu_port_num) - print(r) - - tn.apply_conf_to_onu(onu_mac, vlan_id) - sleep(1) - return stack_num, rack_num, fiber_num, new_onu_port_num - - -if __name__ == '__main__': - ip = '192.168.0.100' - try: - register_onu_ZTE_F660( - olt_ip=ip, onu_sn=b'ZTEG^#*$&@&', login_passwd=(b'login', b'password'), - onu_mac=b'MAC' - ) - except ZteOltConsoleError as e: - print(e) - except ConnectionRefusedError: - print('ERROR: connection refused', ip) diff --git a/docs/extra_func.md b/docs/extra_func.md index fb253b7..190790d 100644 --- a/docs/extra_func.md +++ b/docs/extra_func.md @@ -3,13 +3,10 @@ Его совсем не много, но без внимания оставить нельзя. Все вспомогательные модули можно найти в пакете **djing.lib**. -### tln -Это модуль работы по *telnet* ### messaging Этот модуль помогает работать с форматами СМС сообщений. - ### init Содержит всякие мелкие примочки, код прост и с комментариями, зайдите посмотрите. diff --git a/requirements.txt b/requirements.txt index b26aade..771d5f1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -6,7 +6,7 @@ Pillow netaddr # for testing required xmltodict -xmltodict +#xmltodict dicttoxml # db client for Postgres @@ -39,6 +39,9 @@ asterisk # viberbot -e git://github.com/Viber/viber-bot-python.git#egg=viberbot +# pexpect +-e git://github.com/pexpect/pexpect.git#egg=pexpect + Celery redis==2.10.6 celery[redis]