Browse Source

Add little bit tests

devel
bashmak 8 years ago
parent
commit
17a5088990
  1. 7
      abonapp/forms.py
  2. 6
      abonapp/models.py
  3. 7
      abonapp/pay_systems.py
  4. 46
      abonapp/tests.py
  5. 10
      abonapp/views.py
  6. 9
      accounts_app/models.py
  7. 2
      agent/downloader.py
  8. 68
      devapp/tests.py
  9. 2
      devapp/urls.py
  10. 11
      devapp/views.py
  11. 15
      djing/global_base_views.py
  12. 9
      djing/lib/decorators.py

7
abonapp/forms.py

@ -1,4 +1,3 @@
from datetime import datetime
from django.utils.translation import ugettext as _
from django import forms
from django.contrib.auth.hashers import make_password
@ -8,8 +7,6 @@ from . import models
from django.conf import settings
from djing import IP_ADDR_REGEX
TELEPHONE_REGEXP = getattr(settings, 'TELEPHONE_REGEXP', r'^(\+[7,8,9,3]\d{10,11})?$')
def generate_random_chars(length=6, chars=digits, split=2, delimiter=''):
username = ''.join(choice(chars) for i in range(length))
@ -73,7 +70,7 @@ class AbonForm(forms.ModelForm):
}),
'telephone': forms.TextInput(attrs={
'placeholder': _('telephone placeholder'),
'pattern': TELEPHONE_REGEXP
'pattern': getattr(settings, 'TELEPHONE_REGEXP', r'^(\+[7,8,9,3]\d{10,11})?$')
}),
'description': forms.Textarea(attrs={'rows': '4'}),
'is_active': forms.NullBooleanSelect(attrs={'class': 'form-control'})
@ -137,7 +134,7 @@ class AdditionalTelephoneForm(forms.ModelForm):
widgets = {
'telephone': forms.TextInput(attrs={
'placeholder': _('telephone placeholder'),
'pattern': TELEPHONE_REGEXP,
'pattern': getattr(settings, 'TELEPHONE_REGEXP', r'^(\+[7,8,9,3]\d{10,11})?$'),
'required': '',
'class': 'form-control'
}),

6
abonapp/models.py

@ -20,8 +20,6 @@ from djing import IP_ADDR_REGEX
from tariff_app.models import Tariff, PeriodicPay
from bitfield import BitField
TELEPHONE_REGEXP = getattr(settings, 'TELEPHONE_REGEXP', r'^(\+[7,8,9,3]\d{10,11})?$')
class AbonLog(models.Model):
abon = models.ForeignKey('Abon', models.CASCADE)
@ -402,7 +400,9 @@ class AdditionalTelephone(models.Model):
max_length=16,
verbose_name=_('Telephone'),
# unique=True,
validators=(RegexValidator(TELEPHONE_REGEXP),)
validators=(RegexValidator(
getattr(settings, 'TELEPHONE_REGEXP', r'^(\+[7,8,9,3]\d{10,11})?$')
),)
)
owner_name = models.CharField(max_length=127)

7
abonapp/pay_systems.py

@ -6,9 +6,6 @@ from django.db import DatabaseError
from django.conf import settings
from xmlview.decorators import xml_view
SECRET = getattr(settings, 'PAY_SECRET')
SERV_ID = getattr(settings, 'PAY_SERV_ID')
@xml_view(root_node='pay-response')
def allpay(request):
@ -36,7 +33,7 @@ def allpay(request):
# check sign
md = md5()
s = '_'.join((str(act), pay_account or '', serv_id or '', pay_id, SECRET))
s = '_'.join((str(act), pay_account or '', serv_id or '', pay_id, getattr(settings, 'PAY_SECRET')))
md.update(bytes(s, 'utf-8'))
our_sign = md.hexdigest()
if our_sign != sign:
@ -50,7 +47,7 @@ def allpay(request):
'balance': ballance,
'name': fio,
'account': pay_account,
'service_id': SERV_ID,
'service_id': getattr(settings, 'PAY_SERV_ID'),
'min_amount': 10.0,
'max_amount': 5000,
'status_code': 21,

46
abonapp/tests.py

@ -13,13 +13,12 @@ from abonapp.pay_systems import allpay
from group_app.models import Group
rf = RequestFactory()
SERVICE_ID = getattr(settings, 'PAY_SERV_ID')
SECRET = getattr(settings, 'PAY_SECRET')
def _make_sign(act: int, pay_account: str, serv_id: str, pay_id):
md = md5()
s = "%d_%s_%s_%s_%s" % (act, pay_account, serv_id, pay_id, SECRET)
secret = getattr(settings, 'PAY_SECRET')
s = "%d_%s_%s_%s_%s" % (act, pay_account, serv_id, pay_id, secret)
md.update(bytes(s, 'utf-8'))
return md.hexdigest()
@ -46,13 +45,14 @@ class AllPayTestCase(TestCase):
def user_pay_view(self):
print('test_user_pay_view')
current_date = timezone.now().strftime(self.time_format)
service_id = getattr(settings, 'PAY_SERV_ID')
r = allpay(rf.get(self.pay_url, {
'ACT': 1,
'PAY_ACCOUNT': 'pay_account1',
'SERVICE_ID': SERVICE_ID,
'SERVICE_ID': service_id,
'PAY_ID': '840ab457-e7d1-4494-8197-9570da035170',
'TRADE_POINT': 'term1',
'SIGN': _make_sign(1, 'pay_account1', SERVICE_ID, '840ab457-e7d1-4494-8197-9570da035170')
'SIGN': _make_sign(1, 'pay_account1', service_id, '840ab457-e7d1-4494-8197-9570da035170')
}
))
r = r.content.decode('utf-8')
@ -61,7 +61,7 @@ class AllPayTestCase(TestCase):
"<balance>-13.12</balance>",
"<name>Test Name</name>",
"<account>pay_account1</account>",
"<service_id>%s</service_id>" % SERVICE_ID,
"<service_id>%s</service_id>" % service_id,
"<min_amount>10.0</min_amount>",
"<max_amount>5000</max_amount>",
"<status_code>21</status_code>",
@ -72,21 +72,22 @@ class AllPayTestCase(TestCase):
def user_pay_pay(self):
print('test_user_pay_pay')
current_date = timezone.now().strftime(self.time_format)
service_id = getattr(settings, 'PAY_SERV_ID')
r = allpay(rf.get(self.pay_url, {
'ACT': 4,
'PAY_ACCOUNT': 'pay_account1',
'PAY_AMOUNT': 18.21,
'RECEIPT_NUM': 2126235,
'SERVICE_ID': SERVICE_ID,
'SERVICE_ID': service_id,
'PAY_ID': '840ab457-e7d1-4494-8197-9570da035170',
'TRADE_POINT': 'term1',
'SIGN': _make_sign(4, 'pay_account1', SERVICE_ID, '840ab457-e7d1-4494-8197-9570da035170')
'SIGN': _make_sign(4, 'pay_account1', service_id, '840ab457-e7d1-4494-8197-9570da035170')
}))
r = r.content.decode('utf-8')
xml = ''.join((
"<pay-response>",
"<pay_id>840ab457-e7d1-4494-8197-9570da035170</pay_id>",
"<service_id>%s</service_id>" % SERVICE_ID,
"<service_id>%s</service_id>" % service_id,
"<amount>18.21</amount>",
"<status_code>22</status_code>",
"<time_stamp>%s</time_stamp>" % current_date,
@ -98,12 +99,13 @@ class AllPayTestCase(TestCase):
def user_pay_check(self):
print('test_user_pay_check')
current_date = timezone.now().strftime(self.time_format)
service_id = getattr(settings, 'PAY_SERV_ID')
r = allpay(rf.get(self.pay_url,
{
'ACT': 7,
'SERVICE_ID': SERVICE_ID,
'SERVICE_ID': service_id,
'PAY_ID': '840ab457-e7d1-4494-8197-9570da035170',
'SIGN': _make_sign(7, '', SERVICE_ID, '840ab457-e7d1-4494-8197-9570da035170')
'SIGN': _make_sign(7, '', service_id, '840ab457-e7d1-4494-8197-9570da035170')
}
))
r = r.content.decode('utf-8')
@ -113,7 +115,7 @@ class AllPayTestCase(TestCase):
"<time_stamp>%s</time_stamp>" % current_date,
"<transaction>",
"<pay_id>840ab457-e7d1-4494-8197-9570da035170</pay_id>",
"<service_id>%s</service_id>" % SERVICE_ID,
"<service_id>%s</service_id>" % service_id,
"<amount>18.21</amount>",
"<status>111</status>",
"<time_stamp>%s</time_stamp>" % self.test_pay_time,
@ -124,14 +126,15 @@ class AllPayTestCase(TestCase):
def check_ballance(self):
print('check_ballance')
service_id = getattr(settings, 'PAY_SERV_ID')
r = allpay(rf.get(self.pay_url,
{
'ACT': 1,
'PAY_ACCOUNT': 'pay_account1',
'SERVICE_ID': SERVICE_ID,
'SERVICE_ID': service_id,
'PAY_ID': '840ab457-e7d1-4494-8197-9570da035170',
'TRADE_POINT': 'term1',
'SIGN': _make_sign(1, 'pay_account1', SERVICE_ID, '840ab457-e7d1-4494-8197-9570da035170')
'SIGN': _make_sign(1, 'pay_account1', service_id, '840ab457-e7d1-4494-8197-9570da035170')
}
))
r = r.content.decode('utf-8')
@ -142,13 +145,14 @@ class AllPayTestCase(TestCase):
def test_client_does_not_exist(self):
print('test_client_does_not_exist')
current_date = timezone.now().strftime(self.time_format)
service_id = getattr(settings, 'PAY_SERV_ID')
r = allpay(rf.get(self.pay_url, {
'ACT': 1,
'PAY_ACCOUNT': 'not_existing_acc',
'SERVICE_ID': SERVICE_ID,
'SERVICE_ID': service_id,
'PAY_ID': '840ab457-e7d1-4494-8197-9570da035170',
'TRADE_POINT': 'term1',
'SIGN': _make_sign(1, 'not_existing_acc', SERVICE_ID, '840ab457-e7d1-4494-8197-9570da035170')
'SIGN': _make_sign(1, 'not_existing_acc', service_id, '840ab457-e7d1-4494-8197-9570da035170')
}
))
r = r.content.decode('utf-8')
@ -161,13 +165,14 @@ class AllPayTestCase(TestCase):
def try_pay_double(self):
print('try_pay_double')
service_id = getattr(settings, 'PAY_SERV_ID')
r = allpay(rf.get(self.pay_url, {
'ACT': 4,
'PAY_ACCOUNT': 'pay_account1',
'SERVICE_ID': SERVICE_ID,
'SERVICE_ID': service_id,
'PAY_ID': '840ab457-e7d1-4494-8197-9570da035170',
'TRADE_POINT': 'term1',
'SIGN': _make_sign(4, 'pay_account1', SERVICE_ID, '840ab457-e7d1-4494-8197-9570da035170')
'SIGN': _make_sign(4, 'pay_account1', service_id, '840ab457-e7d1-4494-8197-9570da035170')
}))
r = r.content.decode('utf-8')
r = parse(r)
@ -178,11 +183,12 @@ class AllPayTestCase(TestCase):
print('non_existing_pay')
current_date = timezone.now().strftime(self.time_format)
uuid = '9f154e93-d800-419a-92f7-da33529138be'
service_id = getattr(settings, 'PAY_SERV_ID')
r = allpay(rf.get(self.pay_url, {
'ACT': 7,
'SERVICE_ID': SERVICE_ID,
'SERVICE_ID': service_id,
'PAY_ID': uuid,
'SIGN': _make_sign(7, '', SERVICE_ID, uuid)
'SIGN': _make_sign(7, '', service_id, uuid)
}))
r = r.content.decode('utf-8')
xml = ''.join((

10
abonapp/views.py

@ -32,11 +32,9 @@ from djing import ping
from djing import lib
from djing.global_base_views import OrderingMixin, BaseListWithFiltering, SecureApiView
PAGINATION_ITEMS_PER_PAGE = getattr(settings, 'PAGINATION_ITEMS_PER_PAGE', 10)
class BaseAbonListView(OrderingMixin, BaseListWithFiltering):
paginate_by = PAGINATION_ITEMS_PER_PAGE
paginate_by = getattr(settings, 'PAGINATION_ITEMS_PER_PAGE', 10)
http_method_names = ('get',)
@ -472,7 +470,7 @@ def unsubscribe_service(request, gid, uname, abon_tariff_id):
@method_decorator(login_required, name='dispatch')
@method_decorator(permission_required('abonapp.can_view_abonlog'), name='dispatch')
class LogListView(ListView):
paginate_by = PAGINATION_ITEMS_PER_PAGE
paginate_by = getattr(settings, 'PAGINATION_ITEMS_PER_PAGE', 10)
http_method_names = ('get',)
context_object_name = 'logs'
template_name = 'abonapp/log.html'
@ -482,7 +480,7 @@ class LogListView(ListView):
@method_decorator(login_required, name='dispatch')
@method_decorator(permission_required('abonapp.can_view_invoiceforpayment'), name='dispatch')
class DebtorsListView(ListView):
paginate_by = PAGINATION_ITEMS_PER_PAGE
paginate_by = getattr(settings, 'PAGINATION_ITEMS_PER_PAGE', 10)
http_method_names = ('get',)
context_object_name = 'invoices'
template_name = 'abonapp/debtors.html'
@ -492,7 +490,7 @@ class DebtorsListView(ListView):
@method_decorator(login_required, name='dispatch')
@method_decorator(permission_required('group_app.can_view_group', (Group, 'pk', 'gid')), name='dispatch')
class TaskLogListView(ListView):
paginate_by = PAGINATION_ITEMS_PER_PAGE
paginate_by = getattr(settings, 'PAGINATION_ITEMS_PER_PAGE', 10)
http_method_names = ('get',)
context_object_name = 'tasks'
template_name = 'abonapp/task_log.html'

9
accounts_app/models.py

@ -8,9 +8,6 @@ from django.utils.translation import gettext_lazy as _
from django.conf import settings
from group_app.models import Group
DEFAULT_PICTURE = getattr(settings, 'DEFAULT_PICTURE', '/static/img/user_ava.gif')
TELEPHONE_REGEXP = getattr(settings, 'TELEPHONE_REGEXP', r'^(\+[7,8,9,3]\d{10,11})?$')
class MyUserManager(BaseUserManager):
def create_user(self, telephone, username, password=None):
@ -60,7 +57,9 @@ class BaseAccount(AbstractBaseUser, PermissionsMixin):
max_length=16,
verbose_name=_('Telephone'),
blank=True,
validators=(RegexValidator(TELEPHONE_REGEXP),)
validators=(RegexValidator(
getattr(settings, 'TELEPHONE_REGEXP', r'^(\+[7,8,9,3]\d{10,11})?$')
),)
)
USERNAME_FIELD = 'username'
@ -105,7 +104,7 @@ class UserProfile(BaseAccount):
if self.avatar and os.path.isfile(self.avatar.path):
return self.avatar.url
else:
return DEFAULT_PICTURE
return getattr(settings, 'DEFAULT_PICTURE', '/static/img/user_ava.gif')
def get_min_ava(self):
return self.get_big_ava()

2
agent/downloader.py

@ -2,7 +2,7 @@
from urllib import request
from hashlib import sha256
API_AUTH_SECRET = 'Your api hash'
API_AUTH_SECRET = 'piodpaosdpa_sd&3jf[owfafogjaspioha*a'
FILE_LINK = 'http://localhost:8000/dev/nagios/hosts/'
"""

68
devapp/tests.py

@ -1,22 +1,54 @@
from django.test import TestCase
from . import dev_types
from hashlib import sha256
from django.shortcuts import resolve_url
from django.test import TestCase, RequestFactory, override_settings
from accounts_app.models import UserProfile
from devapp.models import Device
from group_app.models import Group
rf = RequestFactory()
API_SECRET = 'TestApiSecret'
def calc_hash(data):
if type(data) is str:
result_data = data.encode('utf-8')
else:
result_data = bytes(data)
return sha256(result_data).hexdigest()
class DevTest(TestCase):
def setUp(self):
pass
def snmp(self):
dev = dev_types.DLinkDevice('10.115.1.105', '<community>', 2)
print(('DevName:', dev.get_device_name()))
ports = dev.get_ports()
print('gports')
for port in ports:
if not issubclass(port.__class__, dev_types.BasePort):
raise TypeError
print(('\tPort:', port.nm, port.st, port.mac(), port.sp))
# Disable 2 port
print((ports[1].disable()))
# Enable 2 port
print((ports[1].enable()))
grp = Group.objects.create(title='Grp1')
my_admin = UserProfile.objects.create_superuser('+79781234567', 'local_superuser', 'ps')
# self.client.login(username=my_admin.username, password=my_admin.password)
self.adminuser = my_admin
Device.objects.create(
ip_address='192.168.0.100',
mac_addr='78:81:f2:1f:d2:a9',
comment='Test device',
devtype='On',
man_passw='public',
group=grp
)
@override_settings(API_AUTH_SECRET=API_SECRET, API_AUTH_SUBNET='127.0.0.1')
def test_secure_api_ok(self):
self.client.force_login(self.adminuser)
sign = calc_hash(API_SECRET)
url = resolve_url('devapp:nagios_get_all_hosts')
r = self.client.get(url, {
'sign': sign
})
self.assertEqual(r.status_code, 200)
@override_settings(API_AUTH_SECRET=API_SECRET, API_AUTH_SUBNET='127.0.0.1')
def test_get_config_nagios_file(self):
self.client.force_login(self.adminuser)
sign = calc_hash(API_SECRET)
url = resolve_url('devapp:nagios_objects_conf')
r = self.client.get(url, {
'sign': sign
})
self.assertEqual(r.status_code, 200)

2
devapp/urls.py

@ -35,5 +35,5 @@ urlpatterns = [
# Nagios mon generate
url(r'^nagios/hosts/$', views.nagios_objects_conf, name='nagios_objects_conf'),
url(r'^api/getall/$', views.DevicesGetListView.as_view())
url(r'^api/getall/$', views.DevicesGetListView.as_view(), name='nagios_get_all_hosts')
]

11
devapp/views.py

@ -687,9 +687,12 @@ class OnDeviceMonitoringEvent(global_base_views.SecureApiView):
@hash_auth_view
def nagios_objects_conf(request):
def getconf(device_instance: Device):
config = device_instance.generate_config_template()
if config is not None:
return config
try:
config = device_instance.generate_config_template()
if config is not None:
return config
except DeviceImplementationError:
pass
devices_queryset = Device.objects.exclude(Q(mac_addr=None) | Q(ip_address='127.0.0.1')) \
.select_related('parent_dev') \
@ -717,7 +720,7 @@ class DevicesGetListView(global_base_views.SecureApiView):
for r in res:
if isinstance(r['mac_addr'], EUI):
r['mac_addr'] = int(r['mac_addr'])
return tuple(res)
return list(res)
@login_required

15
djing/global_base_views.py

@ -10,9 +10,6 @@ from netaddr import IPNetwork, IPAddress
from django.core.paginator import InvalidPage, EmptyPage
from djing.lib.decorators import hash_auth_view
API_AUTH_SECRET = getattr(settings, 'API_AUTH_SECRET')
API_AUTH_SUBNET = getattr(settings, 'API_AUTH_SUBNET')
class RedirectWhenError(Exception):
def __init__(self, url, failed_message=None):
@ -28,7 +25,8 @@ class RedirectWhenError(Exception):
class HashAuthView(View):
def __init__(self, *args, **kwargs):
if API_AUTH_SECRET is None or API_AUTH_SECRET == 'your api secret':
api_auth_secret = getattr(settings, 'API_AUTH_SECRET')
if api_auth_secret is None or api_auth_secret == 'your api secret':
raise NotImplementedError('You must specified API_AUTH_SECRET in settings')
else:
super(HashAuthView, self).__init__(*args, **kwargs)
@ -53,15 +51,16 @@ class AllowedSubnetMixin(object):
Return 403 denied otherwise.
"""
ip = IPAddress(request.META.get('REMOTE_ADDR'))
if type(API_AUTH_SUBNET) is str:
if ip in IPNetwork(API_AUTH_SUBNET):
api_auth_subnet = getattr(settings, 'API_AUTH_SUBNET')
if type(api_auth_subnet) is str:
if ip in IPNetwork(api_auth_subnet):
return super(AllowedSubnetMixin, self).dispatch(request, *args, **kwargs)
try:
for subnet in API_AUTH_SUBNET:
for subnet in api_auth_subnet:
if ip in IPNetwork(subnet):
return super(AllowedSubnetMixin, self).dispatch(request, *args, **kwargs)
except TypeError:
if ip in IPNetwork(str(API_AUTH_SUBNET)):
if ip in IPNetwork(str(api_auth_subnet)):
return super(AllowedSubnetMixin, self).dispatch(request, *args, **kwargs)
return HttpResponseForbidden('Access Denied')

9
djing/lib/decorators.py

@ -5,9 +5,6 @@ from django.shortcuts import redirect
from djing.lib import check_sign
DEBUG = getattr(settings, 'DEBUG', False)
API_AUTH_SECRET = getattr(settings, 'API_AUTH_SECRET')
def require_ssl(view):
"""
@ -18,7 +15,8 @@ def require_ssl(view):
@wraps(view)
def wrapper(request, *args, **kwargs):
if not DEBUG and not request.is_secure():
debug = getattr(settings, 'DEBUG', False)
if not debug and not request.is_secure():
target_url = "https://" + request.META['HTTP_HOST'] + request.path_info
return HttpResponseRedirect(target_url)
return view(request, *args, **kwargs)
@ -41,6 +39,7 @@ def only_admins(fn):
def hash_auth_view(fn):
@wraps(fn)
def wrapped(request, *args, **kwargs):
api_auth_secret = getattr(settings, 'API_AUTH_SECRET')
sign = request.GET.get('sign')
if sign is None or sign == '':
return HttpResponseForbidden('Access Denied')
@ -50,7 +49,7 @@ def hash_auth_view(fn):
del get_values['sign']
values_list = [l for l in get_values.values() if l]
values_list.sort()
values_list.append(API_AUTH_SECRET)
values_list.append(api_auth_secret)
if check_sign(values_list, sign):
return fn(request, *args, **kwargs)
else:

Loading…
Cancel
Save