Browse Source

continuing development

devel
Dmitry 9 years ago
parent
commit
889099a185
  1. 13
      agent.py
  2. 33
      agent/db.py
  3. 52
      agent/firewall.py
  4. 72
      agent/ipfw.sh
  5. 139
      agent/main.py
  6. 159
      agent/models.py
  7. 242
      agent/sslTransmitter.py
  8. 18
      install.sql
  9. 31
      shaper.sh

13
agent.py

@ -1,13 +0,0 @@
#!/bin/env python2
import os
from agent import main
if __name__ == "__main__":
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "djing.settings")
while True:
main(debug=True)
print "Exit from main, reload..."

33
agent/db.py

@ -1,33 +0,0 @@
# -*- coding:utf-8 -*-
from json import loads
import requests
from requests.exceptions import ConnectionError
from models import deserialize_tariffs, deserialize_abonents
import settings
def load_from_db():
try:
r = requests.get('%s://%s:%d/abons/api/abons' % (
'https' if settings.IS_USE_SSL else 'http',
settings.SERVER_IP,
settings.SERVER_PORT
), verify=False)
user_data = loads(r.text)
# Получаем тарифы
tariffs = deserialize_tariffs(user_data)
# Получаем пользователей
abons = deserialize_abonents(user_data, tariffs)
return abons, tariffs
except ValueError as e:
print('Error:', e, r.text)
except ConnectionError:
print("Can not connect to server %s:%d..." % (settings.SERVER_IP, settings.SERVER_PORT))
exit(0)

52
agent/firewall.py

@ -1,52 +0,0 @@
# -*- coding:utf-8 -*-
from agent.models import Abonent, Tariff
class FirewallManager(object):
f = r'/sbin/ipfw -q'
# вызывает комманду shell
def exec_cmd(self, cmd):
print cmd
# os.execv(cmd, [''])
# ставит заглушку на абонента
def set_cap(self, user):
pass
# Открывает доступ в интернет
def open_inet_door(self, user):
assert isinstance(user, Abonent)
if not user.tariff:
print u'WARNING: User does not have a tariff'
return
cmd = r"%s table 12 add %s/32 %d && %s table 13 add %s/32 %d" % (
self.f, user.ip_str(), user.tariff.tid,
self.f, user.ip_str(), user.tariff.tid + 1000
)
self.exec_cmd(cmd)
# Закрывает доступ в интернет
def close_inet_door(self, user):
assert isinstance(user, Abonent)
cmd = r"%s table 12 del %s/32 && %s table 13 del %s/32" % (
self.f, user.ip_str(),
self.f, user.ip_str()
)
self.exec_cmd(cmd)
# Создаёт тариф (пайпы, режущие скорость
def make_tariff(self, tariff):
assert isinstance(tariff, Tariff)
cmd = r"make ipfw tariff :)"
self.exec_cmd(cmd)
# Убирает тариф из фаервола
def destroy_tariff(self, tariff):
assert isinstance(tariff, Tariff)
cmd = r"destroy ipfw tariff :)"
self.exec_cmd(cmd)
def reset(self):
cmd = r"%s -f flush && %s table all flush" % (self.f, self.f)
self.exec_cmd(cmd)

72
agent/ipfw.sh

@ -1,72 +0,0 @@
#!/bin/sh
#########################################################
# ВАЖНО! Биллинг пока ограничен количеством тарифных планов
# не больше 1000
#########################################################
f="/sbin/ipfw -q"
lan=em1 # Clients
wan=em0 # Inet
${f} -f flush
${f} table all flush
sysctl net.inet.ip.fw.one_pass=0
# dns
${f} table 100 add 8.8.8.8 # google public dns
${f} table 100 add 8.8.4.4 # google public dns2
${f} table 100 add 77.88.8.8 # yandex base dns
${f} table 100 add 77.88.8.1 # yandex base dns2
# ssh access
${f} add 50 allow tcp from any to me 22
${f} add 51 allow tcp from me 22 to any
# loopback
${f} add 100 allow ip from any to any via lo0
# в таблице 100 приоритетный траффик.
# это dns, платёжки..
${f} add 500 allow ip from table\(100\) to any
${f} add 501 allow ip from any to table\(100\)
# в таблице 10 разрешённые пользователи
# блокируем трафик всем кто не в ней
${f} add 1001 deny ip from not table\(10\) to any via $wan
# если у абонентов есть внешние адреса (не через NAT)
#${f} add 1101 deny ip from any to not table\(10\) via $wan
# по 2 пайпа на тарифный план, на вход и выход
#${f} pipe 212 config bw 1152Kbit/s mask src-ip 0xffffffff noerror
#${f} pipe 213 config bw 1152Kbit/s mask dst-ip 0xffffffff noerror
# добавляем пайпы в таблицу
${f} add 2001 pipe 212 ip from table\(10\) to any via $wan
${f} add 2002 pipe 213 ip from any to table\(11\) via $wan
#----------------------
# так добавляем абонентов чтоб резать скорость, надо указать номер их пайпа
#${f} table 10 add 10.0.172.138/32 212
#${f} table 11 add 10.0.172.138/32 2212
#----------------------
# тут будем поджимать пользователей когда не хватает канала

139
agent/main.py

@ -1,139 +0,0 @@
# -*- coding:utf-8 -*-
from sys import stdout
from time import sleep
from db import load_from_db
from firewall import FirewallManager
from sslTransmitter import TransmitServer
from agent.models import Abonent, Tariff
def filter_user_by_id(users, uid):
# users = filter(lambda usr: isinstance(usr, Abonent), users)
users = filter(lambda usr: usr.uid == uid, users)
if len(users) > 0:
return users[0]
def filter_tariff_by_id(tariffs, tid):
# tariffs = filter(lambda trf: isinstance(trf, Tariff), tariffs)
tariffs = filter(lambda trf: trf.tid == tid, tariffs)
if len(tariffs) > 0:
return tariffs[0]
def create_abon(tariffs, users, event, frw):
print('SIGNAL: Create abon')
trf = filter_tariff_by_id(tariffs, int(event.dt['tarif_id']))
abon = Abonent(
int(event.id),
int(event.dt['ip']),
trf
)
users.append(abon)
if abon.is_access():
frw.open_inet_door(abon)
def main(debug=False):
users, tariffs = load_from_db()
frw = FirewallManager()
frw.reset()
# Инициализация абонентов
if debug:
print("Инициализация...")
# Открываем доступ в инет тем кто активен и у кого подключён тариф
for usr in filter(lambda usr: usr.is_active, users):
# даём услуги если можно
if usr.is_access():
# Открываем доступ в инет
frw.open_inet_door(usr)
if debug: print "Разрешён доступ в инет для:", usr.ip_str()
# Слушем в отдельном процессе сеть на предмет событий
ts = TransmitServer('127.0.0.1', 2134)
ts.start()
if debug: print("Загружено %d абонентов" % len(users))
while True:
# Загружаем события для абонентов из сети (список объектов EventNAS из models)
events = ts.get_data()
# Проходим по появившимся событиям
for event in events:
# event.toa, event.id, event.dt
# Смотрим тип события
toa = int(event.toa)
if toa == 0:
continue
# создаём абонента
elif toa == 1:
create_abon(tariffs, users, event, frw)
# Сигнал о том что инфа об абоненте изменилась, надо перечитать
elif toa == 2:
print('SIGNAL: Change abon')
usr = filter_user_by_id(users, event.id)
# если есть то меняем инфу о клиенте
if usr:
usr.deserialize(event.dt, tariffs)
# в любом случае сначала очистить всю инфу о клиенте из таблицы фаера
frw.close_inet_door(usr)
# если у абонента есть доступ то можно и в инет
if usr.is_access():
frw.open_inet_door(usr)
# Иначе создаём клиента
else:
create_abon(tariffs, users, event, frw)
# Удаляем абонента
elif toa == 3:
print('SIGNAL: Delete abon')
usr = filter_user_by_id(users, event.id)
frw.close_inet_door(usr)
users.remove(usr)
# Создаём тариф
elif toa == 4:
print('SIGNAL: Create tariff')
trf = Tariff(
int(event.dt['tid']),
float(event.dt['speedIn']),
float(event.dt['speedOut'])
)
tariffs.append(trf)
frw.make_tariff(trf)
# Обновить тарифф
elif toa == 5:
print('SIGNAL: Change tariff')
trf = filter_tariff_by_id(tariffs, int(event.dt['tarif_id']))
trf.deserialize(event.dt)
frw.destroy_tariff(trf)
frw.make_tariff(trf)
# Удалить тарифф
elif toa == 6:
print('SIGNAL: Delete tariff')
ban_users = filter(lambda usr: usr.tariff.tid == usr.tariff.tid, users)
for usr in ban_users:
frw.close_inet_door(usr)
trf = filter_tariff_by_id(tariffs, int(event.dt['tarif_id']))
tariffs.remove(trf)
elif toa == 7:
# Сигнал на перезагрузку
# Выходим из main, выше она в цикле запустится ещё раз
return
# Очищаем очередь событий
ts.clear()
# ждём время между итерациями проверки 10 сек...
sleep(10)
stdout.write('.')
stdout.flush()

159
agent/models.py

@ -1,159 +0,0 @@
# -*- coding:utf-8 -*-
import socket
import struct
from json import loads, dumps
from abc import ABCMeta, abstractmethod
class Serializer(object):
__metaclass__ = ABCMeta
@abstractmethod
def _serializable_obj(self):
"""Вернуть словарь для сериализации"""
def serialize(self):
return dumps(self._serializable_obj())
@abstractmethod
def deserialize(self, *args):
"""Надо обязательно этот метод реализовать, он много где используется.
Из JSON создать объект класса где реализуется метод"""
def serialize_tariffs(tariffs):
dt = map(lambda trf: trf._serializable_obj(), tariffs)
return dumps({'tariffs': dt})
def deserialize_tariffs(dat):
dat = loads(dat) if type(dat) == str else dat
# Распаковываем из JSON массива dat['tariffs'] объекты через метод deserialize
return map(lambda tariff: Tariff().deserialize(tariff), dat['tariffs'])
def serialize_abonents(abonents):
dt = map(lambda abn: abn._serializable_obj(), abonents)
return dumps({'subscribers': dt})
def deserialize_abonents(dat, tariffs):
dat = loads(dat) if type(dat) == str else dat
# Распаковываем из JSON массива dat['subscribers'] объекты через метод deserialize
return map(lambda abon: Abonent().deserialize(abon, tariffs), dat['subscribers'])
class Tariff(Serializer):
tid = 0
speedIn = 0.0
speedOut = 0.0
def __init__(self, tariff_id=None, speed_in=None, speed_out=None):
self.tid = tariff_id
self.speedOut = speed_out
self.speedIn = speed_in
def is_active(self):
"""возвращает активность тарифа. Если он не активен то пропустить"""
return True
def _serializable_obj(self):
return {
'id': self.tid,
'speedIn': self.speedIn,
'speedOut': self.speedOut
}
def deserialize(self, dump):
inf = loads(dump) if type(dump) == str else dump
self.speedIn = float(inf['speedIn'])
self.speedOut = float(inf['speedOut'])
self.tid = int(inf['id'])
return self
class Abonent(Serializer):
uid = 0
tariff = Tariff()
ip = 0xffffffff
# Включён-ли абонент
is_active = True
def __init__(self, uid=None, ip=None, tariff=None, is_active=True):
# none потому что может инициализироваться пустым, чтоб быть распакованным через deserialize()
if tariff:
assert isinstance(tariff, Tariff)
self.ip = ip
self.uid = uid
self.tariff = tariff
self.is_active = is_active
def ip_str(self):
return socket.inet_ntoa(struct.pack("!I", self.ip))
def _serializable_obj(self):
return {
'id': self.uid,
'is_active': bool(self.is_active),
'ip': self.ip,
'tarif_id': self.tariff.tid if self.tariff else 0
}
def deserialize(self, dump, tariffs):
# фильтруем только элементы нужного типа
tariffs = filter(lambda trf: isinstance(trf, Tariff), tariffs)
assert len(tariffs) > 0
inf = loads(dump) if type(dump) == str else dump
self.uid = int(inf['id'])
self.is_active = bool(inf['is_active'])
self.ip = int(inf['ip'])
tarif_id = int(inf['tarif_id'])
dbtrf = filter(lambda trf: trf.tid == tarif_id, tariffs)
if len(dbtrf) > 0:
self.tariff = dbtrf[0]
else:
self.tariff = None
return self
def is_access(self):
# Доступ в интернет происходит по наличию подключённого тарифа
# если тарифа нет, то и инета нет
if self.is_active and self.tariff is not None:
return True
else:
return False
class EventNAS(Serializer):
# Type Of Action
toa = 0
# id of object
id = 0
# extended data
dt = object()
def __init__(self, type_action=None, obj_id=None, ext_data=None):
self.toa = type_action
self.id = obj_id
self.dt = ext_data
def _serializable_obj(self):
if self.dt:
return {'toa': self.toa, 'id': self.id, 'dt': self.dt}
else:
return {'toa': self.toa, 'id': self.id}
def deserialize(self, dump):
try:
inf = loads(dump) if type(dump) == str else dump
except ValueError:
return
self.toa = int(inf['toa'])
self.id = int(inf['id'])
self.dt = inf.get('dt')
return self

242
agent/sslTransmitter.py

@ -1,242 +0,0 @@
# -*- coding: utf-8 -*-
import ssl
import socket
from multiprocessing import Process, Manager
import settings
from models import EventNAS, Abonent, Tariff
class NetExcept(Exception):
def __init__(self, value):
self.value = value
def __str__(self):
return repr(self.value)
class SSLTransmitterServer(object):
bindsocket = None
def connect(self, ip, port):
self.bindsocket = socket.socket()
self.bindsocket.bind((ip, port))
self.bindsocket.listen(5)
@staticmethod
def _on_data_recive(v, data):
print "do_something:", data
# with lock:
dat = EventNAS().deserialize(data)
if dat is not None:
v.append(dat)
else:
print 'ERROR: bad data:', data
return False
def _deal_with_client(self, connstream, v):
data = connstream.read()
while data:
if not self._on_data_recive(v, data):
break
data = connstream.read()
def process(self, v):
while True:
newsocket, fromaddr = self.bindsocket.accept()
connstream = ssl.wrap_socket(newsocket,
server_side=True,
certfile=settings.CERTFILE,
keyfile=settings.KEYFILE)
try:
self._deal_with_client(connstream, v)
finally:
connstream.shutdown(socket.SHUT_RDWR)
connstream.close()
class PlainTransmitterServer(SSLTransmitterServer):
def process(self, v):
while True:
newsocket, fromaddr = self.bindsocket.accept()
dat = newsocket.recv(0xffff)
if not dat:
break
self._on_data_recive(v, dat)
# Декоратор переводит классы абонента базы к объекту агента если надо.
# abonapp.models.Abon -> agent.Abonent
def agent_abon_typer(fn):
def wrapped(self, abon):
if isinstance(abon, Abonent):
fn(self, abon)
else:
act_tar = abon.active_tariff()
agent_tariff = Tariff(act_tar.id, act_tar.speedIn, act_tar.speedOut) if act_tar else None
abn = Abonent(
abon.id,
abon.ip_address.int_ip() if abon.ip_address else 0,
agent_tariff,
abon.is_active
)
fn(self, abn)
return wrapped
# Декоратор переводит классы тарифа базы к объекту агента если надо.
# tariff_app.models.Tariff -> agent.Tariff
def agent_tariff_typer(fn):
def wrapped(self, tariff):
if isinstance(tariff, Tariff):
fn(self, tariff)
else:
trf = Tariff(
tariff.id,
tariff.speedIn,
tariff.speedOut
)
fn(self, trf)
return wrapped
class SSLTransmitterClient(object):
s = None
def __init__(self, ip=None, port=None):
try:
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
# Require a certificate from the server. We used a self-signed certificate
# so here ca_certs must be the server certificate itself.
self.s = ssl.wrap_socket(s,
ca_certs=settings.CERTFILE,
cert_reqs=ssl.CERT_REQUIRED)
self.s.connect((
ip or settings.SELF_IP,
port or settings.SELF_PORT
))
except socket.error:
raise NetExcept('Ошибка подключения к NAS агенту %s:%d' % (
ip or settings.SELF_IP,
port or settings.SELF_PORT
))
def write(self, d):
self.s.write(d)
# Создаём абонента
@agent_abon_typer
def signal_abon_create(self, abon):
self.write(
EventNAS(1, abon.id, abon._serializable_obj()).serialize()
)
# Обновляем абонента
@agent_abon_typer
def signal_abon_refresh(self, abon):
self.write(
EventNAS(2, abon.uid, abon._serializable_obj()).serialize()
)
# Удаляем абонента
@agent_abon_typer
def signal_abon_remove(self, abon):
self.write(
EventNAS(3, abon.id).serialize()
)
# Создаём тариф
@agent_tariff_typer
def signal_tariff_create(self, tariff):
self.write(
EventNAS(4, tariff.tid, tariff._serializable_obj()).serialize()
)
# Обновляем тариф
@agent_tariff_typer
def signal_tariff_refresh(self, tariff):
self.write(
EventNAS(5, tariff.tid, tariff._serializable_obj()).serialize()
)
# Удаляем тариф
@agent_tariff_typer
def signal_tariff_remove(self, tariff):
self.write(
EventNAS(6, tariff.tid).serialize()
)
# Перезагружаем всё
@agent_abon_typer
def signal_agent_reboot(self):
self.write(
EventNAS(7, 0).serialize()
)
def __del__(self):
if self.s:
self.s.close()
class PlainTransmitterClient(SSLTransmitterClient):
def __init__(self, ip=None, port=None):
try:
self.s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.s.connect((
ip or settings.SELF_IP,
port or settings.SELF_PORT
))
except socket.error:
raise NetExcept(u'Ошибка подключения к NAS агенту на %s:%d' % (
ip or settings.SELF_IP,
port or settings.SELF_PORT
))
def write(self, d):
self.s.send(d)
# общалка с NAS'ом
def get_TransmitterClientKlass():
if settings.IS_USE_SSL:
return SSLTransmitterClient
else:
return PlainTransmitterClient
def get_TransmitterServerKlass():
if settings.IS_USE_SSL:
return SSLTransmitterServer
else:
return PlainTransmitterServer
def proc_entrypoint(obj, v, lock, ip, port):
srv = get_TransmitterServerKlass()()
srv.connect(ip, port)
srv.process(v)
class TransmitServer(object):
def __init__(self, ip, port):
mngr = Manager()
self.v = mngr.list()
# self.lock = Lock()
self.p = Process(target=proc_entrypoint, args=(self, self.v, None, ip, port)) #self.lock))
def get_data(self):
if len(self.v) > 0:
return list(self.v)
else:
return []
def clear(self):
del self.v[:]
def start(self):
self.p.start()
def __del__(self):
self.p.terminate()

18
install.sql

@ -1,18 +0,0 @@
CREATE TABLE flowstat (
`id` INT(10) AUTO_INCREMENT NOT NULL,
`src_ip` CHAR(8) NOT NULL,
`dst_ip` CHAR(8) NOT NULL,
`proto` SMALLINT(2) UNSIGNED NOT NULL DEFAULT 0,
`src_port` SMALLINT(5) UNSIGNED NOT NULL DEFAULT 0,
`dst_port` SMALLINT(5) UNSIGNED NOT NULL DEFAULT 0,
`octets` INT UNSIGNED NOT NULL DEFAULT 0,
`packets` INT UNSIGNED NOT NULL DEFAULT 0,
PRIMARY KEY (`id`)
)
ENGINE =MyISAM
DEFAULT CHARSET =utf8;
INSERT INTO flowstat (`src_ip`, `dst_ip`, `proto`, `src_port`, `dst_port`, `octets`, `packets`) VALUES
('c0a80201', 'c0a805ba', 6, 49150, 443, 5281, 13),
('c0a80201', 'c0a805ba', 6, 49150, 443, 5281, 13)

31
shaper.sh

@ -1,31 +0,0 @@
#!/usr/bin/bash
IfNet=em0
IfUsr=em1
f=/sbin/ipfw
${f} -f flush
${f} -f pipe flush
${f} -f table all flush
# Разрешаем ICMP
${f} add 50 allow icmp from any to any
# список разрешённых пользователей - table15
${f} add 501 allow ip from "table(15)" to any out recv ${IfUsr} xmit ${IfNet}
# На каждый тарифный план по пайпу
${f} pipe 212 config bw 1152Kbit/s mask src-ip 0xffffffff noerror
${f} pipe 213 config bw 1152Kbit/s mask dst-ip 0xffffffff noerror
# создаём эти пайпы
${f} add 1001 pipe tablearg ip from "table(12)" to any out recv ${IfUsr} xmit ${IfNet}
${f} add 1002 pipe tablearg ip from any to "table(13)" out recv ${IfNet} xmit ${IfUsr}
# ------- Так добавляются пользователи
${f} table 12 add 10.0.172.138/32 212
${f} table 13 add 10.0.172.138/32 213
Loading…
Cancel
Save