From 889099a185b66b62c573f610642e3d0a30bc2415 Mon Sep 17 00:00:00 2001 From: Dmitry Date: Mon, 26 Dec 2016 00:32:07 +0000 Subject: [PATCH] continuing development --- agent.py | 13 --- agent/db.py | 33 ------ agent/firewall.py | 52 --------- agent/ipfw.sh | 72 ------------ agent/main.py | 139 ----------------------- agent/models.py | 159 -------------------------- agent/sslTransmitter.py | 242 ---------------------------------------- install.sql | 18 --- shaper.sh | 31 ----- 9 files changed, 759 deletions(-) delete mode 100755 agent.py delete mode 100644 agent/db.py delete mode 100644 agent/firewall.py delete mode 100644 agent/ipfw.sh delete mode 100644 agent/main.py delete mode 100644 agent/models.py delete mode 100644 agent/sslTransmitter.py delete mode 100644 install.sql delete mode 100644 shaper.sh diff --git a/agent.py b/agent.py deleted file mode 100755 index 48e8255..0000000 --- a/agent.py +++ /dev/null @@ -1,13 +0,0 @@ -#!/bin/env python2 - -import os - -from agent import main - - -if __name__ == "__main__": - os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djing.settings") - - while True: - main(debug=True) - print "Exit from main, reload..." diff --git a/agent/db.py b/agent/db.py deleted file mode 100644 index 48ffea9..0000000 --- a/agent/db.py +++ /dev/null @@ -1,33 +0,0 @@ -# -*- coding:utf-8 -*- -from json import loads - -import requests -from requests.exceptions import ConnectionError - -from models import deserialize_tariffs, deserialize_abonents -import settings - - -def load_from_db(): - try: - r = requests.get('%s://%s:%d/abons/api/abons' % ( - 'https' if settings.IS_USE_SSL else 'http', - settings.SERVER_IP, - settings.SERVER_PORT - ), verify=False) - user_data = loads(r.text) - - # Получаем тарифы - tariffs = deserialize_tariffs(user_data) - - # Получаем пользователей - abons = deserialize_abonents(user_data, tariffs) - - return abons, tariffs - - except ValueError as e: - print('Error:', e, r.text) - - except ConnectionError: - print("Can not connect to server %s:%d..." % (settings.SERVER_IP, settings.SERVER_PORT)) - exit(0) diff --git a/agent/firewall.py b/agent/firewall.py deleted file mode 100644 index 8d25ad2..0000000 --- a/agent/firewall.py +++ /dev/null @@ -1,52 +0,0 @@ -# -*- coding:utf-8 -*- -from agent.models import Abonent, Tariff - - -class FirewallManager(object): - f = r'/sbin/ipfw -q' - - # вызывает комманду shell - def exec_cmd(self, cmd): - print cmd - # os.execv(cmd, ['']) - - # ставит заглушку на абонента - def set_cap(self, user): - pass - - # Открывает доступ в интернет - def open_inet_door(self, user): - assert isinstance(user, Abonent) - if not user.tariff: - print u'WARNING: User does not have a tariff' - return - cmd = r"%s table 12 add %s/32 %d && %s table 13 add %s/32 %d" % ( - self.f, user.ip_str(), user.tariff.tid, - self.f, user.ip_str(), user.tariff.tid + 1000 - ) - self.exec_cmd(cmd) - - # Закрывает доступ в интернет - def close_inet_door(self, user): - assert isinstance(user, Abonent) - cmd = r"%s table 12 del %s/32 && %s table 13 del %s/32" % ( - self.f, user.ip_str(), - self.f, user.ip_str() - ) - self.exec_cmd(cmd) - - # Создаёт тариф (пайпы, режущие скорость - def make_tariff(self, tariff): - assert isinstance(tariff, Tariff) - cmd = r"make ipfw tariff :)" - self.exec_cmd(cmd) - - # Убирает тариф из фаервола - def destroy_tariff(self, tariff): - assert isinstance(tariff, Tariff) - cmd = r"destroy ipfw tariff :)" - self.exec_cmd(cmd) - - def reset(self): - cmd = r"%s -f flush && %s table all flush" % (self.f, self.f) - self.exec_cmd(cmd) diff --git a/agent/ipfw.sh b/agent/ipfw.sh deleted file mode 100644 index 4e9c550..0000000 --- a/agent/ipfw.sh +++ /dev/null @@ -1,72 +0,0 @@ -#!/bin/sh - -######################################################### -# ВАЖНО! Биллинг пока ограничен количеством тарифных планов -# не больше 1000 -######################################################### - - - - -f="/sbin/ipfw -q" - -lan=em1 # Clients -wan=em0 # Inet - - -${f} -f flush -${f} table all flush - -sysctl net.inet.ip.fw.one_pass=0 - -# dns -${f} table 100 add 8.8.8.8 # google public dns -${f} table 100 add 8.8.4.4 # google public dns2 -${f} table 100 add 77.88.8.8 # yandex base dns -${f} table 100 add 77.88.8.1 # yandex base dns2 - - -# ssh access -${f} add 50 allow tcp from any to me 22 -${f} add 51 allow tcp from me 22 to any - - -# loopback -${f} add 100 allow ip from any to any via lo0 - - -# в таблице 100 приоритетный траффик. -# это dns, платёжки.. -${f} add 500 allow ip from table\(100\) to any -${f} add 501 allow ip from any to table\(100\) - - - -# в таблице 10 разрешённые пользователи -# блокируем трафик всем кто не в ней -${f} add 1001 deny ip from not table\(10\) to any via $wan - -# если у абонентов есть внешние адреса (не через NAT) -#${f} add 1101 deny ip from any to not table\(10\) via $wan - - - - -# по 2 пайпа на тарифный план, на вход и выход -#${f} pipe 212 config bw 1152Kbit/s mask src-ip 0xffffffff noerror -#${f} pipe 213 config bw 1152Kbit/s mask dst-ip 0xffffffff noerror - -# добавляем пайпы в таблицу -${f} add 2001 pipe 212 ip from table\(10\) to any via $wan -${f} add 2002 pipe 213 ip from any to table\(11\) via $wan - -#---------------------- -# так добавляем абонентов чтоб резать скорость, надо указать номер их пайпа -#${f} table 10 add 10.0.172.138/32 212 -#${f} table 11 add 10.0.172.138/32 2212 -#---------------------- - - - - -# тут будем поджимать пользователей когда не хватает канала diff --git a/agent/main.py b/agent/main.py deleted file mode 100644 index 290d7a4..0000000 --- a/agent/main.py +++ /dev/null @@ -1,139 +0,0 @@ -# -*- coding:utf-8 -*- -from sys import stdout -from time import sleep - -from db import load_from_db -from firewall import FirewallManager -from sslTransmitter import TransmitServer -from agent.models import Abonent, Tariff - - -def filter_user_by_id(users, uid): - # users = filter(lambda usr: isinstance(usr, Abonent), users) - users = filter(lambda usr: usr.uid == uid, users) - if len(users) > 0: - return users[0] - - -def filter_tariff_by_id(tariffs, tid): - # tariffs = filter(lambda trf: isinstance(trf, Tariff), tariffs) - tariffs = filter(lambda trf: trf.tid == tid, tariffs) - if len(tariffs) > 0: - return tariffs[0] - - -def create_abon(tariffs, users, event, frw): - print('SIGNAL: Create abon') - trf = filter_tariff_by_id(tariffs, int(event.dt['tarif_id'])) - abon = Abonent( - int(event.id), - int(event.dt['ip']), - trf - ) - users.append(abon) - if abon.is_access(): - frw.open_inet_door(abon) - - -def main(debug=False): - users, tariffs = load_from_db() - frw = FirewallManager() - frw.reset() - - # Инициализация абонентов - if debug: - print("Инициализация...") - # Открываем доступ в инет тем кто активен и у кого подключён тариф - for usr in filter(lambda usr: usr.is_active, users): - - # даём услуги если можно - if usr.is_access(): - # Открываем доступ в инет - frw.open_inet_door(usr) - if debug: print "Разрешён доступ в инет для:", usr.ip_str() - - # Слушем в отдельном процессе сеть на предмет событий - ts = TransmitServer('127.0.0.1', 2134) - ts.start() - - if debug: print("Загружено %d абонентов" % len(users)) - - while True: - # Загружаем события для абонентов из сети (список объектов EventNAS из models) - events = ts.get_data() - # Проходим по появившимся событиям - for event in events: - # event.toa, event.id, event.dt - - # Смотрим тип события - toa = int(event.toa) - if toa == 0: - continue - - # создаём абонента - elif toa == 1: - create_abon(tariffs, users, event, frw) - - # Сигнал о том что инфа об абоненте изменилась, надо перечитать - elif toa == 2: - print('SIGNAL: Change abon') - usr = filter_user_by_id(users, event.id) - # если есть то меняем инфу о клиенте - if usr: - usr.deserialize(event.dt, tariffs) - # в любом случае сначала очистить всю инфу о клиенте из таблицы фаера - frw.close_inet_door(usr) - # если у абонента есть доступ то можно и в инет - if usr.is_access(): - frw.open_inet_door(usr) - # Иначе создаём клиента - else: - create_abon(tariffs, users, event, frw) - - # Удаляем абонента - elif toa == 3: - print('SIGNAL: Delete abon') - usr = filter_user_by_id(users, event.id) - frw.close_inet_door(usr) - users.remove(usr) - - # Создаём тариф - elif toa == 4: - print('SIGNAL: Create tariff') - trf = Tariff( - int(event.dt['tid']), - float(event.dt['speedIn']), - float(event.dt['speedOut']) - ) - tariffs.append(trf) - frw.make_tariff(trf) - - # Обновить тарифф - elif toa == 5: - print('SIGNAL: Change tariff') - trf = filter_tariff_by_id(tariffs, int(event.dt['tarif_id'])) - trf.deserialize(event.dt) - frw.destroy_tariff(trf) - frw.make_tariff(trf) - - # Удалить тарифф - elif toa == 6: - print('SIGNAL: Delete tariff') - ban_users = filter(lambda usr: usr.tariff.tid == usr.tariff.tid, users) - for usr in ban_users: - frw.close_inet_door(usr) - trf = filter_tariff_by_id(tariffs, int(event.dt['tarif_id'])) - tariffs.remove(trf) - - elif toa == 7: - # Сигнал на перезагрузку - # Выходим из main, выше она в цикле запустится ещё раз - return - - # Очищаем очередь событий - ts.clear() - - # ждём время между итерациями проверки 10 сек... - sleep(10) - stdout.write('.') - stdout.flush() diff --git a/agent/models.py b/agent/models.py deleted file mode 100644 index 269fc37..0000000 --- a/agent/models.py +++ /dev/null @@ -1,159 +0,0 @@ -# -*- coding:utf-8 -*- -import socket -import struct -from json import loads, dumps -from abc import ABCMeta, abstractmethod - - -class Serializer(object): - __metaclass__ = ABCMeta - - @abstractmethod - def _serializable_obj(self): - """Вернуть словарь для сериализации""" - - def serialize(self): - return dumps(self._serializable_obj()) - - @abstractmethod - def deserialize(self, *args): - """Надо обязательно этот метод реализовать, он много где используется. - Из JSON создать объект класса где реализуется метод""" - - -def serialize_tariffs(tariffs): - dt = map(lambda trf: trf._serializable_obj(), tariffs) - return dumps({'tariffs': dt}) - - -def deserialize_tariffs(dat): - dat = loads(dat) if type(dat) == str else dat - # Распаковываем из JSON массива dat['tariffs'] объекты через метод deserialize - return map(lambda tariff: Tariff().deserialize(tariff), dat['tariffs']) - - -def serialize_abonents(abonents): - dt = map(lambda abn: abn._serializable_obj(), abonents) - return dumps({'subscribers': dt}) - - -def deserialize_abonents(dat, tariffs): - dat = loads(dat) if type(dat) == str else dat - # Распаковываем из JSON массива dat['subscribers'] объекты через метод deserialize - return map(lambda abon: Abonent().deserialize(abon, tariffs), dat['subscribers']) - - -class Tariff(Serializer): - tid = 0 - speedIn = 0.0 - speedOut = 0.0 - - def __init__(self, tariff_id=None, speed_in=None, speed_out=None): - self.tid = tariff_id - self.speedOut = speed_out - self.speedIn = speed_in - - def is_active(self): - """возвращает активность тарифа. Если он не активен то пропустить""" - return True - - def _serializable_obj(self): - return { - 'id': self.tid, - 'speedIn': self.speedIn, - 'speedOut': self.speedOut - } - - def deserialize(self, dump): - inf = loads(dump) if type(dump) == str else dump - self.speedIn = float(inf['speedIn']) - self.speedOut = float(inf['speedOut']) - self.tid = int(inf['id']) - return self - - -class Abonent(Serializer): - uid = 0 - tariff = Tariff() - ip = 0xffffffff - - # Включён-ли абонент - is_active = True - - def __init__(self, uid=None, ip=None, tariff=None, is_active=True): - # none потому что может инициализироваться пустым, чтоб быть распакованным через deserialize() - if tariff: - assert isinstance(tariff, Tariff) - self.ip = ip - self.uid = uid - self.tariff = tariff - self.is_active = is_active - - def ip_str(self): - return socket.inet_ntoa(struct.pack("!I", self.ip)) - - def _serializable_obj(self): - return { - 'id': self.uid, - 'is_active': bool(self.is_active), - 'ip': self.ip, - 'tarif_id': self.tariff.tid if self.tariff else 0 - } - - def deserialize(self, dump, tariffs): - # фильтруем только элементы нужного типа - tariffs = filter(lambda trf: isinstance(trf, Tariff), tariffs) - assert len(tariffs) > 0 - - inf = loads(dump) if type(dump) == str else dump - self.uid = int(inf['id']) - self.is_active = bool(inf['is_active']) - self.ip = int(inf['ip']) - - tarif_id = int(inf['tarif_id']) - dbtrf = filter(lambda trf: trf.tid == tarif_id, tariffs) - if len(dbtrf) > 0: - self.tariff = dbtrf[0] - else: - self.tariff = None - return self - - def is_access(self): - # Доступ в интернет происходит по наличию подключённого тарифа - # если тарифа нет, то и инета нет - if self.is_active and self.tariff is not None: - return True - else: - return False - - -class EventNAS(Serializer): - # Type Of Action - toa = 0 - - # id of object - id = 0 - - # extended data - dt = object() - - def __init__(self, type_action=None, obj_id=None, ext_data=None): - self.toa = type_action - self.id = obj_id - self.dt = ext_data - - def _serializable_obj(self): - if self.dt: - return {'toa': self.toa, 'id': self.id, 'dt': self.dt} - else: - return {'toa': self.toa, 'id': self.id} - - def deserialize(self, dump): - try: - inf = loads(dump) if type(dump) == str else dump - except ValueError: - return - self.toa = int(inf['toa']) - self.id = int(inf['id']) - self.dt = inf.get('dt') - return self diff --git a/agent/sslTransmitter.py b/agent/sslTransmitter.py deleted file mode 100644 index b5b8862..0000000 --- a/agent/sslTransmitter.py +++ /dev/null @@ -1,242 +0,0 @@ -# -*- coding: utf-8 -*- -import ssl -import socket -from multiprocessing import Process, Manager - -import settings -from models import EventNAS, Abonent, Tariff - - -class NetExcept(Exception): - def __init__(self, value): - self.value = value - - def __str__(self): - return repr(self.value) - - -class SSLTransmitterServer(object): - bindsocket = None - - def connect(self, ip, port): - self.bindsocket = socket.socket() - self.bindsocket.bind((ip, port)) - self.bindsocket.listen(5) - - @staticmethod - def _on_data_recive(v, data): - print "do_something:", data - # with lock: - dat = EventNAS().deserialize(data) - if dat is not None: - v.append(dat) - else: - print 'ERROR: bad data:', data - return False - - def _deal_with_client(self, connstream, v): - data = connstream.read() - while data: - if not self._on_data_recive(v, data): - break - data = connstream.read() - - def process(self, v): - while True: - newsocket, fromaddr = self.bindsocket.accept() - connstream = ssl.wrap_socket(newsocket, - server_side=True, - certfile=settings.CERTFILE, - keyfile=settings.KEYFILE) - try: - self._deal_with_client(connstream, v) - finally: - connstream.shutdown(socket.SHUT_RDWR) - connstream.close() - - -class PlainTransmitterServer(SSLTransmitterServer): - def process(self, v): - while True: - newsocket, fromaddr = self.bindsocket.accept() - dat = newsocket.recv(0xffff) - if not dat: - break - self._on_data_recive(v, dat) - - -# Декоратор переводит классы абонента базы к объекту агента если надо. -# abonapp.models.Abon -> agent.Abonent -def agent_abon_typer(fn): - def wrapped(self, abon): - if isinstance(abon, Abonent): - fn(self, abon) - else: - act_tar = abon.active_tariff() - agent_tariff = Tariff(act_tar.id, act_tar.speedIn, act_tar.speedOut) if act_tar else None - abn = Abonent( - abon.id, - abon.ip_address.int_ip() if abon.ip_address else 0, - agent_tariff, - abon.is_active - ) - fn(self, abn) - - return wrapped - - -# Декоратор переводит классы тарифа базы к объекту агента если надо. -# tariff_app.models.Tariff -> agent.Tariff -def agent_tariff_typer(fn): - def wrapped(self, tariff): - if isinstance(tariff, Tariff): - fn(self, tariff) - else: - trf = Tariff( - tariff.id, - tariff.speedIn, - tariff.speedOut - ) - fn(self, trf) - - return wrapped - - -class SSLTransmitterClient(object): - s = None - - def __init__(self, ip=None, port=None): - try: - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - # Require a certificate from the server. We used a self-signed certificate - # so here ca_certs must be the server certificate itself. - self.s = ssl.wrap_socket(s, - ca_certs=settings.CERTFILE, - cert_reqs=ssl.CERT_REQUIRED) - self.s.connect(( - ip or settings.SELF_IP, - port or settings.SELF_PORT - )) - except socket.error: - raise NetExcept('Ошибка подключения к NAS агенту %s:%d' % ( - ip or settings.SELF_IP, - port or settings.SELF_PORT - )) - - def write(self, d): - self.s.write(d) - - # Создаём абонента - @agent_abon_typer - def signal_abon_create(self, abon): - self.write( - EventNAS(1, abon.id, abon._serializable_obj()).serialize() - ) - - # Обновляем абонента - @agent_abon_typer - def signal_abon_refresh(self, abon): - self.write( - EventNAS(2, abon.uid, abon._serializable_obj()).serialize() - ) - - # Удаляем абонента - @agent_abon_typer - def signal_abon_remove(self, abon): - self.write( - EventNAS(3, abon.id).serialize() - ) - - # Создаём тариф - @agent_tariff_typer - def signal_tariff_create(self, tariff): - self.write( - EventNAS(4, tariff.tid, tariff._serializable_obj()).serialize() - ) - - # Обновляем тариф - @agent_tariff_typer - def signal_tariff_refresh(self, tariff): - self.write( - EventNAS(5, tariff.tid, tariff._serializable_obj()).serialize() - ) - - # Удаляем тариф - @agent_tariff_typer - def signal_tariff_remove(self, tariff): - self.write( - EventNAS(6, tariff.tid).serialize() - ) - - # Перезагружаем всё - @agent_abon_typer - def signal_agent_reboot(self): - self.write( - EventNAS(7, 0).serialize() - ) - - def __del__(self): - if self.s: - self.s.close() - - -class PlainTransmitterClient(SSLTransmitterClient): - def __init__(self, ip=None, port=None): - try: - self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.s.connect(( - ip or settings.SELF_IP, - port or settings.SELF_PORT - )) - except socket.error: - raise NetExcept(u'Ошибка подключения к NAS агенту на %s:%d' % ( - ip or settings.SELF_IP, - port or settings.SELF_PORT - )) - - def write(self, d): - self.s.send(d) - - -# общалка с NAS'ом -def get_TransmitterClientKlass(): - if settings.IS_USE_SSL: - return SSLTransmitterClient - else: - return PlainTransmitterClient - - -def get_TransmitterServerKlass(): - if settings.IS_USE_SSL: - return SSLTransmitterServer - else: - return PlainTransmitterServer - - -def proc_entrypoint(obj, v, lock, ip, port): - srv = get_TransmitterServerKlass()() - srv.connect(ip, port) - srv.process(v) - - -class TransmitServer(object): - def __init__(self, ip, port): - mngr = Manager() - self.v = mngr.list() - # self.lock = Lock() - self.p = Process(target=proc_entrypoint, args=(self, self.v, None, ip, port)) #self.lock)) - - def get_data(self): - if len(self.v) > 0: - return list(self.v) - else: - return [] - - def clear(self): - del self.v[:] - - def start(self): - self.p.start() - - def __del__(self): - self.p.terminate() diff --git a/install.sql b/install.sql deleted file mode 100644 index 7324647..0000000 --- a/install.sql +++ /dev/null @@ -1,18 +0,0 @@ -CREATE TABLE flowstat ( - `id` INT(10) AUTO_INCREMENT NOT NULL, - `src_ip` CHAR(8) NOT NULL, - `dst_ip` CHAR(8) NOT NULL, - `proto` SMALLINT(2) UNSIGNED NOT NULL DEFAULT 0, - `src_port` SMALLINT(5) UNSIGNED NOT NULL DEFAULT 0, - `dst_port` SMALLINT(5) UNSIGNED NOT NULL DEFAULT 0, - `octets` INT UNSIGNED NOT NULL DEFAULT 0, - `packets` INT UNSIGNED NOT NULL DEFAULT 0, - PRIMARY KEY (`id`) -) - ENGINE =MyISAM - DEFAULT CHARSET =utf8; - - -INSERT INTO flowstat (`src_ip`, `dst_ip`, `proto`, `src_port`, `dst_port`, `octets`, `packets`) VALUES - ('c0a80201', 'c0a805ba', 6, 49150, 443, 5281, 13), - ('c0a80201', 'c0a805ba', 6, 49150, 443, 5281, 13) diff --git a/shaper.sh b/shaper.sh deleted file mode 100644 index 9e41c35..0000000 --- a/shaper.sh +++ /dev/null @@ -1,31 +0,0 @@ -#!/usr/bin/bash - -IfNet=em0 -IfUsr=em1 - -f=/sbin/ipfw - -${f} -f flush -${f} -f pipe flush -${f} -f table all flush - - -# Разрешаем ICMP -${f} add 50 allow icmp from any to any - - -# список разрешённых пользователей - table15 -${f} add 501 allow ip from "table(15)" to any out recv ${IfUsr} xmit ${IfNet} - - -# На каждый тарифный план по пайпу -${f} pipe 212 config bw 1152Kbit/s mask src-ip 0xffffffff noerror -${f} pipe 213 config bw 1152Kbit/s mask dst-ip 0xffffffff noerror - -# создаём эти пайпы -${f} add 1001 pipe tablearg ip from "table(12)" to any out recv ${IfUsr} xmit ${IfNet} -${f} add 1002 pipe tablearg ip from any to "table(13)" out recv ${IfNet} xmit ${IfUsr} - -# ------- Так добавляются пользователи -${f} table 12 add 10.0.172.138/32 212 -${f} table 13 add 10.0.172.138/32 213