#!/usr/bin/python3 import sqlite3 from pathlib import Path from sqlite3 import Error import urllib.request import json import logging import argparse import requests import os import time import sys import git from types import SimpleNamespace from datetime import datetime from string import ascii_letters, digits from rich import print from argparse import RawTextHelpFormatter from urllib.request import urlopen from operator import itemgetter # Info pages for dev # https://mailcow.docs.apiary.io/#reference/aliases/get-aliases/get-aliases # https://demo.mailcow.email/api/#/Aliases homefilepath = Path.home() filepath = homefilepath.joinpath('.config/malias') database = filepath.joinpath('malias.db') logfile = filepath.joinpath('malias.log') Path(filepath).mkdir(parents=True, exist_ok=True) logging.basicConfig(filename=logfile,level=logging.INFO,format='%(message)s') app_version = '0.4' db_version = '0.2' def get_latest_release(): req = urllib.request.Request('https://gitlab.pm/api/v1/repos/rune/malias/releases/latest') req.add_header('Content-Type', 'application/json') current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) return remoteData['tag_name'] def release_check(): latest_release = get_latest_release() if app_version != latest_release: print('[b]New version available @ [i]https://iurl.no/malias[/b][/i]') else: print ('You have the the latest version. Version: %s' %(app_version)) def connect_database(): Path(filepath).mkdir(parents=True, exist_ok=True) conn = None try: conn = sqlite3.connect(database) except Error as e: logging.error(time.strftime("%Y-%m-%d %H:%M") + ' - Error : ' + str(e)) print(e) finally: if conn: c = conn.cursor() c.execute('''CREATE TABLE IF NOT EXISTS apikey (id integer NOT NULL PRIMARY KEY, api text NOT NULL)''') c.execute('''CREATE TABLE IF NOT EXISTS settings ( id INTEGER NOT NULL PRIMARY KEY, first_run INTEGER, server TEXT, data_copy INTEGER )''') c.execute('''CREATE TABLE IF NOT EXISTS aliases (id integer NOT NULL PRIMARY KEY, alias text NOT NULL, goto text NOT NULL, created text NOT NULL)''') c.execute('''CREATE TABLE IF NOT EXISTS timedaliases (id integer NOT NULL PRIMARY KEY, alias text NOT NULL, goto text NOT NULL, validity text NOT NULL)''') conn.commit() first_run(conn) return conn def first_run(conn): now = datetime.now().strftime("%m-%d-%Y %H:%M") cursor = conn.cursor() cursor.execute('SELECT count(*) FROM settings') count = cursor.fetchone()[0] if count == 0: logging.error(now + ' - First run!') cursor.execute('INSERT INTO settings values(?,?,?,?)', (1, 1, 'dummy.server',0)) cursor.execute('INSERT INTO apikey values(?,?)', (1, 'DUMMY_KEY')) conn.commit() return None def get_settings(kind): cursor = conn.cursor() cursor.execute('SELECT * FROM settings') data = cursor.fetchall() first_run_status = data[0][1] mail_server = data[0][2] copy_status = data[0][3] if kind == 'mail_server': if mail_server == 'dummy.server': print('Error: No mailcow server active. Please add one with [b]malias -m [i]your.server[/i][/b]') exit(0) else: return mail_server if kind == 'first_run_status': return first_run_status if kind == 'copy_status': return copy_status def get_api(): latest_release = get_latest_release() cursor = conn.cursor() cursor.execute('SELECT api FROM apikey') apikey = cursor.fetchone()[0] if apikey == 'DUMMY_KEY': print('Missing API key. Please add with [b]malias -k [i]YOUR-API-KEY[/i][/b]') exit(0) else: return apikey def set_mailserver(server): now = datetime.now().strftime("%m-%d-%Y %H:%M") cursor = conn.cursor() cursor.execute('UPDATE settings SET server = ? WHERE id = 1',(server,)) logging.info(now + ' - Info : mailcow server updated') print('Your mail server has been updated.') conn.commit() def apikey(key): now = datetime.now().strftime("%m-%d-%Y %H:%M") cursor = conn.cursor() cursor.execute('UPDATE apikey SET api = ? WHERE id = 1',(key,)) logging.info(now + ' - Info : API key updated') print('Your API key has been updated.') conn.commit() def get_mail_domains(info): apikey = get_api() cursor = conn.cursor() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/domain/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) if info: total_aliases = 0 i=0 print('\n[b]malias[/b] - All email domains on %s' %(mail_server)) print('==================================================================') for domains in remoteData: cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', ('%'+remoteData[i]['domain_name']+'%','%'+remoteData[i]['domain_name']+'%',)) count = cursor.fetchone()[0] total_aliases += count print('%s \t\twith %s aliases' %(remoteData[i]['domain_name'],count)) i+=1 print('\n\nThere is a total of %s domains with %s aliases.' %(str(i),str(total_aliases))) else: return(remoteData) def create(alias,to_address): now = datetime.now().strftime("%m-%d-%Y %H:%M") server = get_settings('mail_server') apikey = get_api() if checklist(alias) == True: logging.error(now + ' - Error : alias %s exists on the mailcow instance %s ' %(alias,server)) print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server)) exit(0) else: data = {'address': alias,'goto': to_address,'active': "1"} headers = {'X-API-Key': apikey, "Content-Type": "application/json"} response = requests.post('https://'+server+'/api/v1/add/alias', data=json.dumps(data), headers=headers) mail_id = alias_id(alias) if mail_id == None: logging.error(now + ' - Error : alias %s not created on the mailcow instance %s ' %(alias,server)) print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server)) else: cursor = conn.cursor() cursor.execute('INSERT INTO aliases values(?,?,?,?)', (mail_id, alias,to_address,now)) conn.commit() logging.info(now + ' - Info : alias %s created for %s on the mailcow instance %s ' %(alias,to_address,server)) print('\n[b]Info[/b] : alias %s created for %s on the mailcow instance %s \n' %(alias,to_address,server)) def create_timed(username,domain): now = datetime.now().strftime("%m-%d-%Y %H:%M") server = get_settings('mail_server') apikey = get_api() data = {'username': username,'domain': domain} headers = {'X-API-Key': apikey, "Content-Type": "application/json"} response = requests.post('https://'+server+'/api/v1/add/time_limited_alias', data=json.dumps(data), headers=headers) response_data = response.json() if response_data[0]['type'] == 'danger': if response_data[0]['msg'] == 'domain_invalid': print('\n[b]Error[/b] : the domain %s is invalid for %s \n' %(domain, server)) if response_data[0]['msg'] == 'access_denied': print('\n[b]Error[/b] : the server %s responded with [red]Access Denied[/red]\n' %(server)) else: alias = get_last_timed(username) validity = datetime.utcfromtimestamp(alias['validity']).strftime('%d-%m-%Y %H:%M') print('The timed alias %s was created. The alias is valid until %s UTC\n' %(alias['address'], validity)) cursor = conn.cursor() cursor.execute('INSERT INTO timedaliases values(?,?,?,?)', (alias['validity'],alias['address'],username,validity)) conn.commit() logging.info(now + ' - Info : timed alias %s created for %s and valid too %s UTC on the mailcow instance %s ' %(alias['address'],username,validity,server)) def delete_alias(alias): server = get_settings('mail_server') apikey = get_api() if checklist(alias) == True: the_alias_id = alias_id(alias) data = {'id': the_alias_id} headers = {'X-API-Key': apikey, "Content-Type": "application/json"} response = requests.post('https://'+server+'/api/v1/delete/alias', data=json.dumps(data), headers=headers) response_data = response.json() if response_data[0]['type'] == 'success': if check_local_db(the_alias_id) == 1: now = datetime.now().strftime("%m-%d-%Y %H:%M") cursor = conn.cursor() cursor.execute('DELETE from aliases where id = ?',(the_alias_id,)) logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s and Local DB' %(alias,server)) conn.commit() print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s and local DB' %(alias,server)) else: logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s.' %(alias,server)) print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s.' %(alias,server)) else: logging.error(now + ' - Error : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data)) conn.commit() print('\n[b]Error[/b] : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data)) else: print('\n[b]Error[/b] : The alias %s not found') def copy_data(): apikey = get_api() mail_server = get_settings('mail_server') if get_settings('copy_status') == 0: now = datetime.now().strftime("%m-%d-%Y %H:%M") cursor = conn.cursor() req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) i = 0 for data in remoteData: cursor.execute('INSERT INTO aliases values(?,?,?,?)', (remoteData[i]['id'], remoteData[i]['address'],remoteData[i]['goto'],now)) i=i+1 cursor.execute('UPDATE settings SET data_copy = ? WHERE id = 1',(1,)) conn.commit() logging.info(now + ' - Info : aliases imported from the mailcow instance %s to local DB' %(mail_server)) print('\n[b]Info[/b] : aliases imported from the mailcow instance %s to local DB\n' %(mail_server)) else: print('\n[b]Info[/b] : aliases alreday imported from the mailcow instance %s to local DB\n\n[i]Updating with any missing aliases![/i]' %(mail_server)) update_data() def update_data(): apikey = get_api() mail_server = get_settings('mail_server') if get_settings('copy_status') == 1: now = datetime.now().strftime("%m-%d-%Y %H:%M") cursor = conn.cursor() req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) i = 0 count_alias = 0 cursor = conn.cursor() for data in remoteData: cursor.execute('SELECT count(*) FROM aliases where alias like ? and goto like ?', (remoteData[i]['address'],remoteData[i]['goto'],)) count = cursor.fetchone()[0] if count >= 1 : i+=1 else: cursor.execute('INSERT INTO aliases values(?,?,?,?)', (remoteData[i]['id'], remoteData[i]['address'],remoteData[i]['goto'],now)) count_alias+=1 i+=1 conn.commit() if count_alias > 0: logging.info(now + ' - Info : Local DB updated with %s new aliases from %s ' %(str(count_alias),mail_server)) print('\n[b]Info[/b] : Local DB update with %s new aliases from %s \n' %(str(count_alias),mail_server)) else: print('\n[b]Info[/b] : No missing aliases from local DB \n') def checklist(alias): alias_exist = None apikey = get_api() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) i = 0 for search in remoteData: if alias == remoteData[i]['address'] || alias in remoteData[i]['goto']: alias_exist = True i=i+1 cursor = conn.cursor() cursor.execute('SELECT count(*) FROM aliases where alias == ? or goto == ?', (alias,alias,)) count = cursor.fetchone()[0] if count >= 1 : alias_exist = True return alias_exist def list_alias(): apikey = get_api() cursor = conn.cursor() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) i = 0 l = 0 print('\n[b]malias[/b] - All aliases on %s ([b]*[/b] also in local db)' %(mail_server)) print('==================================================================') for search in remoteData: the_alias = remoteData[i]['address'].ljust(20,' ') the_goto = remoteData[i]['goto'].ljust(20,' ') cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', (remoteData[i]['address'],remoteData[i]['address'],)) count = cursor.fetchone()[0] if count >= 1: print(the_alias + '\tgoes to\t\t' + the_goto + '\t[b]*[/b]') l=l+1 else: print(the_alias + '\tgoes to\t\t' + the_goto) i=i+1 print('\n\nTotal number of aliases %s on instance [b]%s[/b] and %s on [b]local DB[/b].' %(str(i),mail_server,str(l))) def alias_id(alias): apikey = get_api() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) i = 0 for search in remoteData: if remoteData[i]['address'] == alias: return remoteData[i]['id'] i=i+1 return None def get_last_timed(username): apikey = get_api() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/time_limited_aliases/%s' %username) req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) remoteData.sort(key = itemgetter('validity'), reverse=True) return (remoteData[0]) def number_of_aliases_in_db(): cursor = conn.cursor() cursor.execute('SELECT count(*) FROM aliases') count = cursor.fetchone()[0] return count def number_of_aliases_on_server(): apikey = get_api() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) return len(remoteData) def check_local_db(alias_id): cursor = conn.cursor() cursor.execute('SELECT count(*) FROM aliases where id = ?',(alias_id,)) count = cursor.fetchone()[0] return count def search(alias): apikey = get_api() mail_server = get_settings('mail_server') alias_server = number_of_aliases_on_server() alias_db = number_of_aliases_in_db() cursor = conn.cursor() cursor.execute('SELECT data_copy FROM settings where id = 1') result = cursor.fetchone()[0] if result == 1: search_term = '%'+alias+'%' cursor.execute('SELECT * from aliases where alias like ? or goto like ?',(search_term,search_term,)) remoteData = cursor.fetchall() i = 0 print('\nAliases on %s that contains [b]%s[/b]' %(mail_server,alias)) print('=================================================================') for search in remoteData: the_alias = remoteData[i][1].ljust(20,' ') print(the_alias + '\tgoes to\t\t' + remoteData[i][2]) i=i+1 print('\n\nData from local DB') else: req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) i = 0 print('\nAliases on %s that contains [b]%s[/b]' %(mail_server,alias)) print('=================================================') for search in remoteData: if alias in remoteData[i]['address'] or alias in remoteData[i]['goto']: print(remoteData[i]['address'] + '\tgoes to\t\t' + remoteData[i]['goto']) i=i+1 print('\n\nData from server') if alias_server != alias_db: print('\n\nThere are %s aliases on the server and %s aliases in local DB' %(str(alias_server),str(alias_db))) print('Run [i]malias -c[/i] to update local DB') def get_mailcow_version(): apikey = get_api() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/status/version') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.loads(remote) remote_heads = git.cmd.Git().ls_remote('https://github.com/mailcow/mailcow-dockerized', tags=True) tags = remote_heads.splitlines() for x in tags: string = x.rsplit('/',2) if remoteData['version'] != string[2]: versionInfo = 'Your Mailcow version is %s and the latest is %s' %(remoteData['version'], string[2]) else: versionInfo = 'You have the latest Mailcow version %s' %remoteData['version'] return (versionInfo) def show_current_info(): API = get_api() mail_server = get_settings('mail_server') latest_release = get_latest_release() if API == 'DUMMY_KEY': API = 'Missing API Key!' if mail_server == 'dummy.server': mail_server = 'Missing address to mailcow instance!' aliases_server = number_of_aliases_on_server() alias_db = number_of_aliases_in_db() mailcow_version = get_mailcow_version() mail_domains = get_mail_domains(False) domain = "" i=0 for domains in mail_domains: if i!=0: domain = domain + ', ' + str(mail_domains[i]['domain_name']) else: domain = domain + str(mail_domains[i]['domain_name']) i+=1 print('\n[b]malias[/b] - Manage aliases on mailcow Instance.') print('===================================================') print('API key : [b]%s[/b]' % (API)) print('Mailcow Instance : [b]%s[/b]' % (mail_server)) print('Active domains : [b]%s[/b]' % (domain)) print('Mailcow version : [b]%s[/b]' % (mailcow_version)) print('Logfile : [b]%s[/b]' % (logfile)) print('Databse : [b]%s[b]' % (database)) print('Aliases on server : [b]%s[/b]' % (aliases_server)) print('Aliases in DB : [b]%s[/b]' % (alias_db)) print('') if app_version[:5] != latest_release: print('App version : [b]%s[/b] a new version (%s) is available @ https://iurl.no/malias' % (app_version,latest_release)) else: print('App version : [b]%s[/b]' % (app_version)) print('') def export_data(): apikey = get_api() mail_server = get_settings('mail_server') req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all') req.add_header('Content-Type', 'application/json') req.add_header('X-API-Key', apikey) current = urllib.request.urlopen(req) remote = current.read().decode('utf-8') remoteData = json.dumps(json.loads(remote),indent=4) with open("alias.json", "w") as outfile: outfile.write(remoteData) def import_data(file): apikey=get_api() mailserver = get_settings('mail_server') active_domains = get_mail_domains(False) with open(file,'r') as mail_alias: json_object = json.load(mail_alias) domain_list = [] active_domain_list = [] for data in json_object: domain = data['domain'] if domain not in domain_list: domain_list.append(domain) for data in active_domains: active_domain_list.append(data['domain_name']) diff = list(set(domain_list) - set(active_domain_list)) if len(diff) != 0: missing = ', '.join(diff) print ('Please add the domains %s to your mailcow instance' % (missing)) sys.exit(1) else: print('OK') def updatedb(): # Function for updatimg DB when we have to # 09.04.24 # Added DB tempalias table # Added DB version table cursor = conn.cursor() cursor.execute('''CREATE TABLE IF NOT EXISTS dbversion (version integer NOT NULL DEFAULT 0)''') cursor.execute('SELECT COUNT(version) FROM dbversion') count = cursor.fetchone()[0] if count == 0: cursor.execute('INSERT INTO dbversion values(?)', (db_version,)) conn.execute('''CREATE TABLE IF NOT EXISTS timedaliases (id integer NOT NULL PRIMARY KEY, alias text NOT NULL, goto text NOT NULL, validity text NOT NULL)''') conn.commit() conn = connect_database() updatedb() # Check if db needs updating and do the updating. parser = argparse.ArgumentParser(prog='malias', description='This is a simple application to help you create and delete aliases on a mailcow instance.\nIf you find any issues or would like to submit a PR - please head over to https://gitlab.pm/rune/malias. \n\nI hope this makes your mailcow life a bit easier!', formatter_class=RawTextHelpFormatter, epilog='Making mailcow easier...') parser.add_argument('-k', '--api', help='Add/Change API key.\n\n', nargs=1, metavar=('APIkey'), required=False, action="append") parser.add_argument('-s', '--search', help='Search for alias.\n\n', nargs=1, metavar=('alias@domain.com'), required=False, action="append") parser.add_argument('-m', '--server', help='Add/Uppdate mailcow instance.\n\n', nargs=1, metavar=('mailcow-server.tld'), required=False, action="append") parser.add_argument('-a', '--add', help='Add new alias.\n\n', nargs=2, metavar=('alias@domain.com', 'to@domain.com'), required=False, action="append") parser.add_argument('-t', '--timed', help='Add new time limited alias for user on domain. One year validity\n\n', nargs=2, metavar=('user@domain.com', 'domain.com'), required=False, action="append") parser.add_argument('-d', '--delete', help='Delete alias.\n\n', nargs=1, metavar=('alias@domain.com'), required=False, action="append") # parser.add_argument('-i', '--import', help='Show current config and appliacation info\n\n', # nargs=1, metavar=('alias.json'), required=False, action="append") parser.add_argument('-v', '--version', help='Show current version and information\n\n', required=False, action='store_true') parser.add_argument('-c', '--copy', help='Copy alias data from mailcow server to local DB.\n\n', required=False, action='store_true') parser.add_argument('-l', '--list', help='List all aliases on the Mailcow instance.\n\n', required=False, action='store_true') parser.add_argument('-o', '--domains', help='List all mail domains on the Mailcow instance.\n\n', required=False, action='store_true') args = vars(parser.parse_args()) if args['api']: apikey(args['api'][0][0]) elif args['search']: search(args['search'][0][0]) elif args['server']: set_mailserver(args['server'][0][0]) elif args['add']: create(args['add'][0][0],args['add'][0][1]) elif args['timed']: create_timed(args['timed'][0][0],args['timed'][0][1]) elif args['delete']: delete_alias(args['delete'][0][0]) elif args['import']: import_data(args['import'][0][0]) elif args['version']: show_current_info() elif args['copy']: copy_data() elif args['list']: list_alias() elif args['domains']: get_mail_domains(True) else: print('\n\nEh, sorry! I need something more to help you! If you write [b]malias -h[/b] I\'ll show a help screen to get you going!!!\n\n\n')