Browse Source

Merge branch 'devel' of https://github.com/nerosketch/djing into devel

devel
www-data 8 years ago
parent
commit
b2b3eed69d
  1. 6
      agent/monitoring_agent.py
  2. 31
      devapp/base_intr.py
  3. 78
      devapp/dev_types.py
  4. 3
      devapp/locale/ru/LC_MESSAGES/django.po
  5. 15
      devapp/models.py
  6. 22
      devapp/templates/devapp/devices.html
  7. 2
      devapp/templates/devapp/ext.htm
  8. 6
      devapp/urls.py
  9. 279
      devapp/views.py
  10. 34
      djing/global_base_views.py
  11. 20
      djing/lib/__init__.py
  12. 35
      djing/lib/decorators.py
  13. 12
      djing/lib/tln/tln.py

6
agent/monitoring_agent.py

@ -21,13 +21,13 @@ def calc_hash(data):
return sha256(result_data).hexdigest()
def check_sign(get_list: Iterable, sign: str):
def check_sign(get_list: Iterable, sign_str: str):
hashed = '_'.join(get_list)
my_sign = calc_hash(hashed)
return sign == my_sign
return sign_str == my_sign
def validate(regexp: Union[bytes, str, re.__Regex], string: AnyStr):
def validate(regexp: Union[bytes, str], string: AnyStr):
if not re.match(regexp, string):
raise ValueError
return string

31
devapp/base_intr.py

@ -5,6 +5,8 @@ from easysnmp import Session
from django.utils.translation import gettext
from djing.lib.decorators import abstract_static_method
ListOrError = Union[
Iterable,
Union[Exception, Iterable]
@ -15,11 +17,11 @@ class DeviceImplementationError(Exception):
pass
class DevBase(object):
class DevBase(object, metaclass=ABCMeta):
def __init__(self, dev_instance=None):
self.db_instance = dev_instance
@staticmethod
@abstract_static_method
def description() -> AnyStr:
pass
@ -43,13 +45,11 @@ class DevBase(object):
def get_template_name(self) -> AnyStr:
"""Return path to html template for device"""
@staticmethod
@abstractmethod
@abstract_static_method
def has_attachable_to_subscriber() -> bool:
"""Can connect device to subscriber"""
@staticmethod
@abstractmethod
@abstract_static_method
def is_use_device_port() -> bool:
"""True if used device port while opt82 authorization"""
@ -62,6 +62,13 @@ class DevBase(object):
:param v: String value for validate
"""
@abstract_static_method
def monitoring_template(device_instance, *args, **kwargs) -> Optional[str]:
"""
Template for monitoring system config
:return: string for config file
"""
class BasePort(object, metaclass=ABCMeta):
def __init__(self, num, name, status, mac, speed):
@ -89,19 +96,29 @@ class SNMPBaseWorker(object, metaclass=ABCMeta):
def __init__(self, ip: Optional[str], community='public', ver=2):
if ip is None or ip == '':
raise DeviceImplementationError(gettext('Ip address is required'))
self.ses = Session(hostname=ip, community=community, version=ver)
self._ip = ip
self._community = community
self._ver = ver
def start_ses(self):
if self.ses is None:
self.ses = Session(hostname=self._ip, community=self._community, version=self._ver)
def set_int_value(self, oid: str, value):
self.start_ses()
return self.ses.set(oid, value, 'i')
def get_list(self, oid) -> Generator:
self.start_ses()
for v in self.ses.walk(oid):
yield v.value
def get_list_keyval(self, oid) -> Generator:
self.start_ses()
for v in self.ses.walk(oid):
snmpnum = v.oid.split('.')[-1:]
yield v.value, snmpnum[0] if len(snmpnum) > 0 else None
def get_item(self, oid):
self.start_ses()
return self.ses.get(oid).value

78
devapp/dev_types.py

@ -1,6 +1,8 @@
import re
from typing import AnyStr, Iterable, Optional, Dict
from datetime import timedelta
from easysnmp import EasySNMPTimeoutError
from transliterate import translit
from django.utils.translation import gettext_lazy as _, gettext
from djing.lib import RuTimedelta, safe_int
@ -8,6 +10,34 @@ from djing.lib.tln.tln import ValidationError as TlnValidationError
from .base_intr import DevBase, SNMPBaseWorker, BasePort, DeviceImplementationError, ListOrError
def _norm_name(name: str, replreg=None):
if replreg is None:
return re.sub(pattern='\W{1,255}', repl='', string=name, flags=re.IGNORECASE)
return replreg.sub('', name)
def plain_ip_device_mon_template(device) -> Optional[AnyStr]:
if not device:
raise ValueError
parent_host_name = _norm_name("%d%s" % (
device.parent_dev.pk, translit(device.parent_dev.comment, language_code='ru', reversed=True)
)) if device.parent_dev else None
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
mac_addr = device.mac_addr
r = (
"define host{",
"\tuse generic-switch",
"\thost_name %s" % host_name,
"\taddress %s" % device.ip_address,
"\tparents %s" % parent_host_name if parent_host_name is not None else '',
"\t_mac_addr %s" % mac_addr if mac_addr is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
class DLinkPort(BasePort):
def __init__(self, num, name, status, mac, speed, snmp_worker):
BasePort.__init__(self, num, name, status, mac, speed)
@ -82,6 +112,10 @@ class DLinkDevice(DevBase, SNMPBaseWorker):
# Dlink has no require snmp info
pass
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
return plain_ip_device_mon_template(device, *args, **kwargs)
class ONUdev(BasePort):
def __init__(self, num, name, status, mac, speed, signal, snmp_worker):
@ -159,6 +193,10 @@ class OLTDevice(DevBase, SNMPBaseWorker):
# Olt has no require snmp info
pass
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
return plain_ip_device_mon_template(device)
class OnuDevice(DevBase, SNMPBaseWorker):
def __init__(self, dev_instance):
@ -232,6 +270,24 @@ class OnuDevice(DevBase, SNMPBaseWorker):
except ValueError:
raise TlnValidationError(_('Onu snmp field must be en integer'))
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
r = (
"define host{",
"\tuse device-onu",
"\thost_name %s" % host_name,
# "\taddress %s" % device.ip_address,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
class EltexPort(BasePort):
def __init__(self, snmp_worker, *args, **kwargs):
@ -287,6 +343,10 @@ class EltexSwitch(DLinkDevice):
def is_use_device_port():
return False
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
return plain_ip_device_mon_template(device)
def conv_signal(lvl: int) -> float:
if lvl == 65535: return 0.0
@ -384,3 +444,21 @@ class ZteOnuDevice(OnuDevice):
int(fiber_num), int(onu_port)
except ValueError:
raise TlnValidationError(_('Zte onu snmp field must be two dot separated integers'))
@staticmethod
def monitoring_template(device, *args, **kwargs) -> Optional[str]:
if not device:
return
host_name = _norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
snmp_item = device.snmp_extra
mac = device.mac_addr
r = (
"define host{",
"\tuse dev-onu-zte-f660",
"\thost_name %s" % host_name,
# "\taddress %s" % device.ip_address,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)

3
devapp/locale/ru/LC_MESSAGES/django.po

@ -603,3 +603,6 @@ msgstr "%(device_name)s недостижим"
msgid "Device %(device_name)s getting undefined status code"
msgstr "Устройство %(device_name)s получило не определённый код состояния"
msgid "View"
msgstr "Посмотреть"

15
devapp/models.py

@ -1,14 +1,15 @@
import os
from typing import Optional, AnyStr
from subprocess import run
from django.db import models
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from djing.fields import MACAddressField
from .base_intr import DevBase
from djing.lib import MyGenericIPAddressField, MyChoicesAdapter
from . import dev_types
from subprocess import run
from django.conf import settings
from django.utils.translation import gettext_lazy as _
from group_app.models import Group
from . import dev_types
from .base_intr import DevBase
class DeviceDBException(Exception):
@ -104,6 +105,10 @@ class Device(models.Model):
param = 'update'
run((filepath, param, newmac, code))
def generate_config_template(self) -> Optional[AnyStr]:
mng_class = self.get_manager_klass()
return mng_class.monitoring_template(self)
class Port(models.Model):
device = models.ForeignKey(Device, models.CASCADE, verbose_name=_('Device'))

22
devapp/templates/devapp/devices.html

@ -43,21 +43,23 @@
<tbody>
{% with can_del_dev=perms.devapp.delete_device can_ch_dev=perms.devapp.change_device grpid=group.id %}
{% for dev in devices %}
{% url 'devapp:view' grpid dev.pk as viewurl %}
<tr>
<td>
{% if dev.status == 'und' %}&ndash;
{% else %}
{% if dev.status == 'unr' or dev.status == 'dwn' %}
<span class="glyphicon glyphicon-exclamation-sign text-danger"></span>
{% elif dev.status == 'up' %}
<span class="glyphicon glyphicon-ok-circle text-success"></span>
<a href="{{ viewurl }}" class="btn btn-sm btn-default">
{% if dev.status == 'und' %}&ndash;
{% else %}
<span class="glyphicon glyphicon-question-sign text-warning"></span>
{% if dev.status == 'unr' or dev.status == 'dwn' %}
<span class="glyphicon glyphicon-eye-open text-danger"></span>
{% elif dev.status == 'up' %}
<span class="glyphicon glyphicon-eye-open text-success"></span>
{% else %}
<span class="glyphicon glyphicon-eye-open text-warning"></span>
{% endif %}
{% endif %}
{% endif %}
</a>
</td>
<td><a href="{% url 'devapp:view' grpid dev.pk %}">{{ dev.ip_address }}</a></td>
<td><a href="{{ viewurl }}">{{ dev.ip_address }}</a></td>
<td>{{ dev.comment }}</td>
<td>{{ dev.mac_addr|default:_('Not assigned') }}</td>
<td>{{ dev.get_devtype_display }}</td>

2
devapp/templates/devapp/ext.htm

@ -26,7 +26,7 @@
{% url 'devapp:view' dev.group.pk|default:0 dev.pk as devapp_view %}
<li{% if devapp_view == request.path %} class="active"{% endif %}>
<a href="{{ devapp_view }}">
{% trans 'Ports' %} {{ dev.ip_address }}
{% trans 'View' %} {{ dev.ip_address }}
</a>
</li>

6
devapp/urls.py

@ -8,11 +8,11 @@ urlpatterns = [
url(r'^devices_without_groups$', views.DevicesWithoutGroupsListView.as_view(), name='devices_null_group'),
url(r'^fix_onu/$', views.fix_onu, name='fix_onu'),
url(r'^(?P<group_id>\d+)$', views.DevicesListView.as_view(), name='devs'),
url(r'^(?P<group_id>\d+)/add$', views.dev, name='add'),
url(r'^(?P<group_id>\d+)/add$', views.DeviceCreateView.as_view(), name='add'),
url(r'^(\d+)/(?P<device_id>\d+)$', views.devview, name='view'),
url(r'^(\d+)/(?P<device_id>\d+)/del$', views.DeviceVeleteView.as_view(), name='del'),
url(r'^(?P<group_id>\d+)/(?P<device_id>\d+)/add$', views.add_single_port, name='add_port'),
url(r'^(?P<group_id>\d+)/(?P<device_id>\d+)/edit$', views.dev, name='edit'),
url(r'^(?P<group_id>\d+)/(?P<device_id>\d+)/edit$', views.DeviceUpdate.as_view(), name='edit'),
url(r'^(\d+)/(?P<device_id>\d+)/ports$', views.manage_ports, name='manage_ports'),
url(r'^(?P<group_id>\d+)/(?P<device_id>\d+)/ports/(?P<port_id>\d+)/fix_port_conflict$', views.fix_port_conflict,
name='fix_port_conflict'),
@ -32,6 +32,6 @@ urlpatterns = [
url(r'^on_device_event/$', views.OnDeviceMonitoringEvent.as_view()),
# Nagios mon generate
url(r'^nagios/hosts/$', views.NagiosObjectsConfView.as_view(), name='nagios_objects_conf'),
url(r'^nagios/hosts/$', views.nagios_objects_conf, name='nagios_objects_conf'),
url(r'^api/getall/$', views.DevicesGetListView.as_view())
]

279
devapp/views.py

@ -1,5 +1,4 @@
import re
from typing import Optional
from django.contrib.auth.decorators import login_required
from django.contrib.gis.shortcuts import render_to_text
from django.core.exceptions import PermissionDenied
@ -11,10 +10,10 @@ from django.contrib import messages
from django.utils.decorators import method_decorator
from django.utils.translation import gettext_lazy as _, gettext
from easysnmp import EasySNMPTimeoutError, EasySNMPError
from django.views.generic import DetailView, DeleteView
from django.views.generic import DetailView, DeleteView, UpdateView, CreateView
from devapp.base_intr import DeviceImplementationError
from djing.lib.decorators import only_admins
from djing.lib.decorators import only_admins, hash_auth_view
from djing.lib import safe_int
from abonapp.models import Abon
from group_app.models import Group
@ -88,81 +87,138 @@ class DeviceVeleteView(DeleteView):
return res
@login_required
@permission_required('devapp.can_view_device')
def dev(request, group_id, device_id=0):
device_group = get_object_or_404(Group, pk=group_id)
if not request.user.has_perm('group_app.can_view_group', device_group):
raise PermissionDenied
devinst = get_object_or_404(Device, id=device_id) if device_id != 0 else None
@method_decorator(login_required, name='dispatch')
@method_decorator(permission_required('devapp.can_view_device'), name='dispatch')
class DeviceUpdate(UpdateView):
template_name = 'devapp/dev.html'
context_object_name = 'dev'
model = Device
form_class = DeviceForm
pk_url_kwarg = 'device_id'
device_group = None
already_dev = None
if request.method == 'POST':
if device_id == 0:
if not request.user.has_perm('devapp.add_device'):
raise PermissionDenied
else:
if not request.user.has_perm('devapp.change_device'):
raise PermissionDenied
def post(self, request, *args, **kwargs):
if not request.user.has_perm('devapp.change_device'):
raise PermissionDenied
try:
frm = DeviceForm(request.POST, instance=devinst)
if frm.is_valid():
# check if that device is exist
try:
already_dev = Device.objects.exclude(pk=device_id).get(mac_addr=request.POST.get('mac_addr'))
if already_dev.group:
messages.warning(request, _('You have redirected to existing device'))
return redirect('devapp:view', already_dev.group.pk, already_dev.pk)
else:
messages.warning(request, _('Please attach group for device'))
return redirect('devapp:fix_device_group', already_dev.pk)
except Device.DoesNotExist:
pass
# else update device info
ndev = frm.save()
# change device info in dhcpd.conf
ndev.update_dhcp()
messages.success(request, _('Device info has been saved'))
return redirect('devapp:edit', ndev.group.pk, ndev.pk)
else:
messages.error(request, _('Form is invalid, check fields and try again'))
return super().post(request, *args, **kwargs)
except IntegrityError as e:
if 'unique constraint' in str(e):
messages.error(request, _('Duplicate user and port: %s') % e)
else:
messages.error(request, e)
else:
if devinst is None:
frm = DeviceForm(initial={
'group': device_group,
'devtype': request.GET.get('t'),
'mac_addr': request.GET.get('mac'),
'comment': request.GET.get('c'),
'ip_address': request.GET.get('ip'),
'man_passw': getattr(settings, 'DEFAULT_SNMP_PASSWORD', ''),
'snmp_extra': request.GET.get('n') or ''
})
else:
frm = DeviceForm(instance=devinst)
if devinst is None:
parent_device_id = request.GET.get('pdev')
return render(request, 'devapp/add_dev.html', {
'form': frm,
'group': device_group,
'already_dev': already_dev,
'selected_parent_dev': get_object_or_None(Device, pk=parent_device_id)
})
else:
return render(request, 'devapp/dev.html', {
'form': frm,
'dev': devinst,
'selected_parent_dev': devinst.parent_dev,
'group': device_group,
'already_dev': already_dev
})
return self.form_invalid(self.get_form())
def form_valid(self, form):
# check if that device is exist
device_id = self.kwargs.get(self.pk_url_kwarg)
try:
already_dev = self.model.objects.exclude(pk=device_id).get(mac_addr=self.request.POST.get('mac_addr'))
self.already_dev = already_dev
if already_dev.group:
messages.warning(self.request, _('You have redirected to existing device'))
return redirect('devapp:view', already_dev.group.pk, already_dev.pk)
else:
messages.warning(self.request, _('Please attach group for device'))
return redirect('devapp:fix_device_group', already_dev.pk)
except Device.DoesNotExist:
pass
r = super().form_valid(form)
# change device info in dhcpd.conf
print(self.object)
self.object.update_dhcp()
messages.success(self.request, _('Device info has been saved'))
return r
def get_success_url(self):
return resolve_url('devapp:edit', self.device_group.pk, self.object.pk)
def dispatch(self, request, *args, **kwargs):
group_id = self.kwargs.get('group_id')
device_group = get_object_or_404(Group, pk=group_id)
if not request.user.has_perm('group_app.can_view_group', device_group):
raise PermissionDenied
self.device_group = device_group
return super().dispatch(request, *args, **kwargs)
def form_invalid(self, form):
messages.error(self.request, _('Form is invalid, check fields and try again'))
return super().form_invalid(form)
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['selected_parent_dev'] = self.object.parent_dev
context['group'] = self.device_group
context['already_dev'] = self.already_dev
return context
@method_decorator(login_required, name='dispatch')
@method_decorator(permission_required('devapp.can_view_device'), name='dispatch')
class DeviceCreateView(CreateView):
template_name = 'devapp/add_dev.html'
context_object_name = 'dev'
model = Device
form_class = DeviceForm
device_group = None
already_dev = None
def get(self, request, *args, **kwargs):
if not request.user.has_perm('devapp.add_device'):
raise PermissionDenied
return super().get(request, *args, **kwargs)
def form_valid(self, form):
# check if that device is exist
device_id = self.kwargs.get(self.pk_url_kwarg)
try:
already_dev = self.model.objects.exclude(pk=device_id).get(mac_addr=self.request.POST.get('mac_addr'))
self.already_dev = already_dev
if already_dev.group:
messages.warning(self.request, _('You have redirected to existing device'))
return redirect('devapp:view', already_dev.group.pk, already_dev.pk)
else:
messages.warning(self.request, _('Please attach group for device'))
return redirect('devapp:fix_device_group', already_dev.pk)
except Device.DoesNotExist:
pass
r = super().form_valid(form)
# change device info in dhcpd.conf
print(self.object)
self.object.update_dhcp()
messages.success(self.request, _('Device info has been saved'))
return r
def get_success_url(self):
return resolve_url('devapp:edit', self.device_group.pk, self.object.pk)
def dispatch(self, request, *args, **kwargs):
group_id = self.kwargs.get('group_id')
device_group = get_object_or_404(Group, pk=group_id)
if not request.user.has_perm('group_app.can_view_group', device_group):
raise PermissionDenied
self.device_group = device_group
return super().dispatch(request, *args, **kwargs)
def get_initial(self):
return {
'group': self.device_group,
'devtype': self.request.GET.get('t'),
'mac_addr': self.request.GET.get('mac'),
'comment': self.request.GET.get('c'),
'ip_address': self.request.GET.get('ip'),
'man_passw': getattr(settings, 'DEFAULT_SNMP_PASSWORD', ''),
'snmp_extra': self.request.GET.get('n') or ''
}
def get_context_data(self, **kwargs):
context = super().get_context_data(**kwargs)
context['group'] = self.device_group
context['already_dev'] = self.already_dev
parent_device_id = self.request.GET.get('pdev')
context['selected_parent_dev'] = get_object_or_None(Device, pk=parent_device_id)
return context
@login_required
@ -471,7 +527,7 @@ def fix_device_group(request, device_id):
device = get_object_or_404(Device, pk=device_id)
try:
if request.method == 'POST':
frm = DeviceForm(request.POST, instance=dev)
frm = DeviceForm(request.POST, instance=device)
if frm.is_valid():
ch_dev = frm.save()
if ch_dev.group:
@ -482,7 +538,7 @@ def fix_device_group(request, device_id):
else:
messages.error(request, _('Form is invalid, check fields and try again'))
else:
frm = DeviceForm(instance=dev)
frm = DeviceForm(instance=device)
except ValueError:
return HttpResponse('ValueError')
return render(request, 'devapp/fix_dev_group.html', {
@ -605,70 +661,21 @@ class OnDeviceMonitoringEvent(global_base_views.SecureApiView):
}
class NagiosObjectsConfView(global_base_views.AuthenticatedOrHashAuthView):
http_method_names = ('get',)
def get(self, request, *args, **kwargs):
from transliterate import translit
confs = list()
def norm_name(name: str, replreg=re.compile(r'\W{1,255}', re.IGNORECASE)):
return replreg.sub('', name)
for device in Device.objects.exclude(Q(mac_addr=None) | Q(ip_address='127.0.0.1')) \
.select_related('parent_dev') \
.only('ip_address', 'comment', 'parent_dev'):
host_name = norm_name("%d%s" % (device.pk, translit(device.comment, language_code='ru', reversed=True)))
conf = None
mac_addr = device.mac_addr
if device.devtype == 'On':
if device.parent_dev:
host_addr = device.parent_dev.ip_address
conf = self.templ_onu(host_name, host_addr, mac=mac_addr, snmp_item=device.snmp_extra or None)
else:
if device.ip_address:
host_addr = device.ip_address
conf = self.templ_onu(host_name, host_addr, mac=mac_addr, snmp_item=device.snmp_extra or None)
else:
parent_host_name = norm_name("%d%s" % (
device.parent_dev.pk, translit(device.parent_dev.comment, language_code='ru', reversed=True)
)) if device.parent_dev else None
conf = self.templ(host_name, host_addr=device.ip_address, mac=mac_addr, parent_host_name=parent_host_name)
if conf is not None:
confs.append(conf)
response = HttpResponse(''.join(confs), content_type='text/plain')
response['Content-Disposition'] = 'attachment; filename="objects.cfg"'
return response
@staticmethod
def templ(host_name: str, host_addr: str, mac: Optional[str], parent_host_name: Optional[str]):
if not host_addr:
return
r = (
"define host{",
"\tuse generic-switch",
"\thost_name %s" % host_name,
"\taddress %s" % host_addr,
"\tparents %s" % parent_host_name if parent_host_name is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
@staticmethod
def templ_onu(host_name: str, host_addr: str, mac: Optional[str], snmp_item: str):
if not host_addr:
return
r = (
"define host{",
"\tuse device-onu",
"\thost_name %s" % host_name,
"\taddress %s" % host_addr,
"\t_snmp_item %s" % snmp_item if snmp_item is not None else '',
"\t_mac_addr %s" % mac if mac is not None else '',
"}\n"
)
return '\n'.join(i for i in r if i)
@hash_auth_view
def nagios_objects_conf(request):
def getconf(device_instance: Device):
config = device_instance.generate_config_template()
if config is not None:
return config
devices_queryset = Device.objects.exclude(Q(mac_addr=None) | Q(ip_address='127.0.0.1')) \
.select_related('parent_dev') \
.only('ip_address', 'comment', 'parent_dev')
confs = map(getconf, devices_queryset)
confs = (c for c in confs if c is not None)
response = HttpResponse(''.join(confs), content_type='text/plain')
response['Content-Disposition'] = 'attachment; filename="objects.cfg"'
return response
class DevicesGetListView(global_base_views.SecureApiView):

34
djing/global_base_views.py

@ -1,5 +1,6 @@
from hashlib import sha256
from json import dumps
from django.utils.decorators import method_decorator
from django.views.generic.base import View
from django.http.response import HttpResponseForbidden, Http404, HttpResponseRedirect, HttpResponse
from django.utils.translation import gettext_lazy as _
@ -7,6 +8,7 @@ from django.conf import settings
from django.views.generic import ListView
from netaddr import IPNetwork, IPAddress
from django.core.paginator import InvalidPage, EmptyPage
from djing.lib.decorators import hash_auth_view
API_AUTH_SECRET = getattr(settings, 'API_AUTH_SECRET')
API_AUTH_SUBNET = getattr(settings, 'API_AUTH_SUBNET')
@ -22,20 +24,8 @@ class RedirectWhenError(Exception):
return self.message or ''
@method_decorator(hash_auth_view, name='dispatch')
class HashAuthView(View):
@staticmethod
def calc_hash(data):
if type(data) is str:
result_data = data.encode('utf-8')
else:
result_data = bytes(data)
return sha256(result_data).hexdigest()
@staticmethod
def check_sign(get_list, sign):
hashed = '_'.join(get_list)
my_sign = HashAuthView.calc_hash(hashed)
return sign == my_sign
def __init__(self, *args, **kwargs):
if API_AUTH_SECRET is None or API_AUTH_SECRET == 'your api secret':
@ -43,22 +33,6 @@ class HashAuthView(View):
else:
super(HashAuthView, self).__init__(*args, **kwargs)
def dispatch(self, request, *args, **kwargs):
sign = request.GET.get('sign')
if sign is None or sign == '':
return HttpResponseForbidden('Access Denied')
# Transmittent get list without sign
get_values = request.GET.copy()
del get_values['sign']
values_list = [l for l in get_values.values() if l]
values_list.sort()
values_list.append(API_AUTH_SECRET)
if self.check_sign(values_list, sign):
return super(HashAuthView, self).dispatch(request, *args, **kwargs)
else:
return HttpResponseForbidden('Access Denied')
class AuthenticatedOrHashAuthView(HashAuthView):

20
djing/lib/__init__.py

@ -1,6 +1,6 @@
import socket
import struct
from abc import ABCMeta
from hashlib import sha256
from datetime import timedelta
from collections import Iterator
from django.db import models
@ -133,3 +133,21 @@ class Singleton(type):
if cls not in cls._instances:
cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs)
return cls._instances[cls]
#
# Function for hash auth
#
def calc_hash(data):
if type(data) is str:
result_data = data.encode('utf-8')
else:
result_data = bytes(data)
return sha256(result_data).hexdigest()
def check_sign(get_list, sign):
hashed = '_'.join(get_list)
my_sign = calc_hash(hashed)
return sign == my_sign

35
djing/lib/decorators.py

@ -1,10 +1,12 @@
from functools import wraps
from django.conf import settings
from django.http import HttpResponseRedirect
from django.http import HttpResponseRedirect, HttpResponseForbidden
from django.shortcuts import redirect
from djing.lib import check_sign
DEBUG = getattr(settings, 'DEBUG', False)
API_AUTH_SECRET = getattr(settings, 'API_AUTH_SECRET')
def require_ssl(view):
@ -33,3 +35,34 @@ def only_admins(fn):
else:
return redirect('client_side:home')
return wrapped
# hash auth for functional views
def hash_auth_view(fn):
@wraps(fn)
def wrapped(request, *args, **kwargs):
sign = request.GET.get('sign')
if sign is None or sign == '':
return HttpResponseForbidden('Access Denied')
# Transmittent get list without sign
get_values = request.GET.copy()
del get_values['sign']
values_list = [l for l in get_values.values() if l]
values_list.sort()
values_list.append(API_AUTH_SECRET)
if check_sign(values_list, sign):
return fn(request, *args, **kwargs)
else:
return HttpResponseForbidden('Access Denied')
return wrapped
class abstract_static_method(staticmethod):
__slots__ = ()
def __init__(self, func):
super(abstract_static_method, self).__init__(func)
func.__isabstractmethod__ = True
__isabstractmethod__ = True

12
djing/lib/tln/tln.py

@ -18,10 +18,6 @@ class ZTEFiberIsFull(ZteOltConsoleError):
pass
class ZTEFiberNumberNotFound(ZteOltConsoleError):
pass
class ValidationError(ValueError):
pass
@ -222,9 +218,7 @@ def register_onu_ZTE_F660(olt_ip: str, onu_sn: bytes, login_passwd: Tuple[bytes,
stack_num, rack_num, fiber_num
)
if last_onu_number < 1:
raise ZTEFiberNumberNotFound
elif last_onu_number > 126:
if last_onu_number > 126:
raise ZTEFiberIsFull('olt fiber %d is full' % fiber_num)
# enter to config
@ -257,8 +251,8 @@ if __name__ == '__main__':
ip = '10.40.1.10'
try:
register_onu_ZTE_F660(
olt_ip=ip, onu_sn=b'ZTEGC0458DCE', login_passwd=(b'admin', b'2ekc3'),
onu_mac=b'cc:7b:35:8b:7:0'
olt_ip=ip, onu_sn=b'ZTEG^#*$&@&', login_passwd=(b'admin', b'password'),
onu_mac=b'MAC'
)
except ZteOltConsoleError as e:
print(e)

Loading…
Cancel
Save