#!/usr/bin/env python # -*- coding: utf-8 -*- import sys import sqlite3 # requirements : pyst2 from asterisk import agi # save script as : /var/lib/asterisk/agi-bin/choose_optimal_trunk.py # mkdir /var/local/wazo/ # chown asterisk /var/local/wazo/ DATABASE = '/var/local/wazo/choose_optimal_trunk.db' _trunks = { 'freephonie': { 'priority': 1, 'max_calls': 99, }, 'trunk_principal': { 'priority': 2, 'max_calls': None, }, } class Store: def __init__(self): """ Create db used for storage if not exist """ self.conn = sqlite3.connect(DATABASE) self.conn.cursor().execute( ( "CREATE TABLE IF NOT EXISTS calls_log (" + "number text PRIMARY KEY, " + "trunk text NOT NULL, " + "date datetime DEFAULT current_timestamp, " + "nbcalls integer DEFAULT 0);" ) ) self.conn.commit() def find_exisiting_trunk(self, destination): """ Search for previous call on same number -- returns trunk name if destination was already seen, None otherwise """ result = self.conn.cursor().execute( 'SELECT trunk from calls_log where number="{}"'.format(destination) ).fetchall() if len(result) > 0: return result[0][0] return None def count_calls_for_trunk(self, trunk): """ Count calls to different numbers made to this trunk """ result = self.conn.cursor().execute( 'SELECT count(number) from calls_log where trunk="{}"'.format(trunk) ).fetchall() return result[0][0] or 0 def set_stats_for_destination(self, destination, trunk): """ Store choosen trunk for the destination or update counter nbcalls """ self.conn.cursor().execute( ( 'INSERT OR IGNORE INTO calls_log (nbcalls, trunk, number) ' + 'values (0, "{0}", "{1}");' ).format(trunk, destination) ) self.conn.cursor().execute( ( 'UPDATE calls_log set ' + 'nbcalls=(select nbcalls from calls_log where number="{1}")+1 ' + 'where number="{1}" and trunk="{0}";' ).format(trunk, destination) ) self.conn.commit() return class ChooseOptimalTrunk: """ Object responsible of choosing the best trunk to use according to rules of priority and max_calls per trunk It stores choosen trunk for a given destination in sqlite database """ def __init__(self): """ Init object """ # Init store self.store = Store() return def get_optimal_trunk(self, destination): """ return prefered trunk as a string and store stats in db """ trunk = self.store.find_exisiting_trunk() if trunk: self.__set_stats_for_destination(destination, trunk) return trunk else: # Sort trunk by priority order prioritized_trunks = sorted( [(x, _trunks[x]['priority']) for x in _trunks], key=lambda tup: tup[1] ) for t, priority in prioritized_trunks: cft = self.store.count_calls_for_trunk(t) # Return first matching trunk if _trunks[t]['max_calls'] is None or cft < _trunks[t]['max_calls']: self.store.set_stats_for_destination(destination, t) return t # init AGI interface myagi = agi.AGI() myagi.verbose('script choose_optimal_trunk started') # Get callerid and destination caller = myagi.env['agi_callerid'] destination = myagi.env['agi_arg_1'] myagi.verbose("call from {0} to {1}".format(caller, destination)) # Determine best trunk cot = ChooseOptimalTrunk() best_trunk = cot.get_optimal_trunk(destination=destination) myagi.verbose("call from {0} to {1} should use {2}".format(caller, destination, best_trunk)) # Return trunk myagi.set_variable('BEST_TRUNK', best_trunk) myagi.verbose('script choose_optimal_trunk ended') sys.exit()