malias/malias.py
2024-05-21 12:37:11 +02:00

670 lines
23 KiB
Python

#!/usr/bin/python3
import sqlite3
from pathlib import Path
from sqlite3 import Error
import urllib.request
import json
import logging
import argparse
import requests
import os
import time
import sys
import git
from types import SimpleNamespace
from datetime import datetime
from string import ascii_letters, digits
from rich import print
from argparse import RawTextHelpFormatter
from urllib.request import urlopen
from operator import itemgetter
# Info pages for dev
# https://mailcow.docs.apiary.io/#reference/aliases/get-aliases/get-aliases
# https://demo.mailcow.email/api/#/Aliases
homefilepath = Path.home()
filepath = homefilepath.joinpath('.config/malias')
database = filepath.joinpath('malias.db')
logfile = filepath.joinpath('malias.log')
Path(filepath).mkdir(parents=True, exist_ok=True)
logging.basicConfig(filename=logfile,level=logging.INFO,format='%(message)s')
app_version = '0.4'
db_version = '0.2'
def get_latest_release():
req = urllib.request.Request('https://gitlab.pm/api/v1/repos/rune/malias/releases/latest')
req.add_header('Content-Type', 'application/json')
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
return remoteData['tag_name']
def release_check():
latest_release = get_latest_release()
if app_version != latest_release:
print('[b]New version available @ [i]https://iurl.no/malias[/b][/i]')
else:
print ('You have the the latest version. Version: %s' %(app_version))
def connect_database():
Path(filepath).mkdir(parents=True, exist_ok=True)
conn = None
try:
conn = sqlite3.connect(database)
except Error as e:
logging.error(time.strftime("%Y-%m-%d %H:%M") + ' - Error : ' + str(e))
print(e)
finally:
if conn:
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS apikey
(id integer NOT NULL PRIMARY KEY,
api text NOT NULL)''')
c.execute('''CREATE TABLE IF NOT EXISTS settings (
id INTEGER NOT NULL PRIMARY KEY,
first_run INTEGER,
server TEXT,
data_copy INTEGER
)''')
c.execute('''CREATE TABLE IF NOT EXISTS aliases
(id integer NOT NULL PRIMARY KEY,
alias text NOT NULL,
goto text NOT NULL,
created text NOT NULL)''')
c.execute('''CREATE TABLE IF NOT EXISTS timedaliases
(id integer NOT NULL PRIMARY KEY,
alias text NOT NULL,
goto text NOT NULL,
validity text NOT NULL)''')
conn.commit()
first_run(conn)
return conn
def first_run(conn):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM settings')
count = cursor.fetchone()[0]
if count == 0:
logging.error(now + ' - First run!')
cursor.execute('INSERT INTO settings values(?,?,?,?)', (1, 1, 'dummy.server',0))
cursor.execute('INSERT INTO apikey values(?,?)', (1, 'DUMMY_KEY'))
conn.commit()
return None
def get_settings(kind):
cursor = conn.cursor()
cursor.execute('SELECT * FROM settings')
data = cursor.fetchall()
first_run_status = data[0][1]
mail_server = data[0][2]
copy_status = data[0][3]
if kind == 'mail_server':
if mail_server == 'dummy.server':
print('Error: No mailcow server active. Please add one with [b]malias -m [i]your.server[/i][/b]')
exit(0)
else:
return mail_server
if kind == 'first_run_status':
return first_run_status
if kind == 'copy_status':
return copy_status
def get_api():
latest_release = get_latest_release()
cursor = conn.cursor()
cursor.execute('SELECT api FROM apikey')
apikey = cursor.fetchone()[0]
if apikey == 'DUMMY_KEY':
print('Missing API key. Please add with [b]malias -k [i]YOUR-API-KEY[/i][/b]')
exit(0)
else:
return apikey
def set_mailserver(server):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('UPDATE settings SET server = ? WHERE id = 1',(server,))
logging.info(now + ' - Info : mailcow server updated')
print('Your mail server has been updated.')
conn.commit()
def apikey(key):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('UPDATE apikey SET api = ? WHERE id = 1',(key,))
logging.info(now + ' - Info : API key updated')
print('Your API key has been updated.')
conn.commit()
def get_mail_domains(info):
apikey = get_api()
cursor = conn.cursor()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/domain/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
if info:
total_aliases = 0
i=0
print('\n[b]malias[/b] - All email domains on %s' %(mail_server))
print('==================================================================')
for domains in remoteData:
cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', ('%'+remoteData[i]['domain_name']+'%','%'+remoteData[i]['domain_name']+'%',))
count = cursor.fetchone()[0]
total_aliases += count
print('%s \t\twith %s aliases' %(remoteData[i]['domain_name'],count))
i+=1
print('\n\nThere is a total of %s domains with %s aliases.' %(str(i),str(total_aliases)))
else:
return(remoteData)
def create(alias,to_address):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
server = get_settings('mail_server')
apikey = get_api()
if checklist(alias) == True:
logging.error(now + ' - Error : alias %s exists on the mailcow instance %s ' %(alias,server))
print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server))
exit(0)
else:
data = {'address': alias,'goto': to_address,'active': "1"}
headers = {'X-API-Key': apikey, "Content-Type": "application/json"}
response = requests.post('https://'+server+'/api/v1/add/alias',
data=json.dumps(data), headers=headers)
mail_id = alias_id(alias)
if mail_id == None:
logging.error(now + ' - Error : alias %s not created on the mailcow instance %s ' %(alias,server))
print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server))
else:
cursor = conn.cursor()
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (mail_id, alias,to_address,now))
conn.commit()
logging.info(now + ' - Info : alias %s created for %s on the mailcow instance %s ' %(alias,to_address,server))
print('\n[b]Info[/b] : alias %s created for %s on the mailcow instance %s \n' %(alias,to_address,server))
def create_timed(username,domain):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
server = get_settings('mail_server')
apikey = get_api()
data = {'username': username,'domain': domain}
headers = {'X-API-Key': apikey, "Content-Type": "application/json"}
response = requests.post('https://'+server+'/api/v1/add/time_limited_alias',
data=json.dumps(data), headers=headers)
response_data = response.json()
if response_data[0]['type'] == 'danger':
if response_data[0]['msg'] == 'domain_invalid':
print('\n[b]Error[/b] : the domain %s is invalid for %s \n' %(domain, server))
if response_data[0]['msg'] == 'access_denied':
print('\n[b]Error[/b] : the server %s responded with [red]Access Denied[/red]\n' %(server))
else:
alias = get_last_timed(username)
validity = datetime.utcfromtimestamp(alias['validity']).strftime('%d-%m-%Y %H:%M')
print('The timed alias %s was created. The alias is valid until %s UTC\n' %(alias['address'], validity))
cursor = conn.cursor()
cursor.execute('INSERT INTO timedaliases values(?,?,?,?)', (alias['validity'],alias['address'],username,validity))
conn.commit()
logging.info(now + ' - Info : timed alias %s created for %s and valid too %s UTC on the mailcow instance %s ' %(alias['address'],username,validity,server))
def delete_alias(alias):
server = get_settings('mail_server')
apikey = get_api()
if checklist(alias) == True:
the_alias_id = alias_id(alias)
data = {'id': the_alias_id}
headers = {'X-API-Key': apikey, "Content-Type": "application/json"}
response = requests.post('https://'+server+'/api/v1/delete/alias',
data=json.dumps(data), headers=headers)
response_data = response.json()
if response_data[0]['type'] == 'success':
if check_local_db(the_alias_id) == 1:
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('DELETE from aliases where id = ?',(the_alias_id,))
logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s and Local DB' %(alias,server))
conn.commit()
print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s and local DB' %(alias,server))
else:
logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s.' %(alias,server))
print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s.' %(alias,server))
else:
logging.error(now + ' - Error : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data))
conn.commit()
print('\n[b]Error[/b] : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data))
else:
print('\n[b]Error[/b] : The alias %s not found')
def copy_data():
apikey = get_api()
mail_server = get_settings('mail_server')
if get_settings('copy_status') == 0:
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
for data in remoteData:
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (remoteData[i]['id'], remoteData[i]['address'],remoteData[i]['goto'],now))
i=i+1
cursor.execute('UPDATE settings SET data_copy = ? WHERE id = 1',(1,))
conn.commit()
logging.info(now + ' - Info : aliases imported from the mailcow instance %s to local DB' %(mail_server))
print('\n[b]Info[/b] : aliases imported from the mailcow instance %s to local DB\n' %(mail_server))
else:
print('\n[b]Info[/b] : aliases alreday imported from the mailcow instance %s to local DB\n\n[i]Updating with any missing aliases![/i]' %(mail_server))
update_data()
def update_data():
apikey = get_api()
mail_server = get_settings('mail_server')
if get_settings('copy_status') == 1:
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
count_alias = 0
cursor = conn.cursor()
for data in remoteData:
cursor.execute('SELECT count(*) FROM aliases where alias like ? and goto like ?', (remoteData[i]['address'],remoteData[i]['goto'],))
count = cursor.fetchone()[0]
if count >= 1 :
i+=1
else:
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (remoteData[i]['id'], remoteData[i]['address'],remoteData[i]['goto'],now))
count_alias+=1
i+=1
conn.commit()
if count_alias > 0:
logging.info(now + ' - Info : Local DB updated with %s new aliases from %s ' %(str(count_alias),mail_server))
print('\n[b]Info[/b] : Local DB update with %s new aliases from %s \n' %(str(count_alias),mail_server))
else:
print('\n[b]Info[/b] : No missing aliases from local DB \n')
def checklist(alias):
alias_exist = None
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
for search in remoteData:
if alias == remoteData[i]['address'] || alias in remoteData[i]['goto']:
alias_exist = True
i=i+1
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM aliases where alias == ? or goto == ?', (alias,alias,))
count = cursor.fetchone()[0]
if count >= 1 :
alias_exist = True
return alias_exist
def list_alias():
apikey = get_api()
cursor = conn.cursor()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
l = 0
print('\n[b]malias[/b] - All aliases on %s ([b]*[/b] also in local db)' %(mail_server))
print('==================================================================')
for search in remoteData:
the_alias = remoteData[i]['address'].ljust(20,' ')
the_goto = remoteData[i]['goto'].ljust(20,' ')
cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', (remoteData[i]['address'],remoteData[i]['address'],))
count = cursor.fetchone()[0]
if count >= 1:
print(the_alias + '\tgoes to\t\t' + the_goto + '\t[b]*[/b]')
l=l+1
else:
print(the_alias + '\tgoes to\t\t' + the_goto)
i=i+1
print('\n\nTotal number of aliases %s on instance [b]%s[/b] and %s on [b]local DB[/b].' %(str(i),mail_server,str(l)))
def alias_id(alias):
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
for search in remoteData:
if remoteData[i]['address'] == alias:
return remoteData[i]['id']
i=i+1
return None
def get_last_timed(username):
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/time_limited_aliases/%s' %username)
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
remoteData.sort(key = itemgetter('validity'), reverse=True)
return (remoteData[0])
def number_of_aliases_in_db():
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM aliases')
count = cursor.fetchone()[0]
return count
def number_of_aliases_on_server():
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
return len(remoteData)
def check_local_db(alias_id):
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM aliases where id = ?',(alias_id,))
count = cursor.fetchone()[0]
return count
def search(alias):
apikey = get_api()
mail_server = get_settings('mail_server')
alias_server = number_of_aliases_on_server()
alias_db = number_of_aliases_in_db()
cursor = conn.cursor()
cursor.execute('SELECT data_copy FROM settings where id = 1')
result = cursor.fetchone()[0]
if result == 1:
search_term = '%'+alias+'%'
cursor.execute('SELECT * from aliases where alias like ? or goto like ?',(search_term,search_term,))
remoteData = cursor.fetchall()
i = 0
print('\nAliases on %s that contains [b]%s[/b]' %(mail_server,alias))
print('=================================================================')
for search in remoteData:
the_alias = remoteData[i][1].ljust(20,' ')
print(the_alias + '\tgoes to\t\t' + remoteData[i][2])
i=i+1
print('\n\nData from local DB')
else:
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
print('\nAliases on %s that contains [b]%s[/b]' %(mail_server,alias))
print('=================================================')
for search in remoteData:
if alias in remoteData[i]['address'] or alias in remoteData[i]['goto']:
print(remoteData[i]['address'] + '\tgoes to\t\t' + remoteData[i]['goto'])
i=i+1
print('\n\nData from server')
if alias_server != alias_db:
print('\n\nThere are %s aliases on the server and %s aliases in local DB' %(str(alias_server),str(alias_db)))
print('Run [i]malias -c[/i] to update local DB')
def get_mailcow_version():
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/status/version')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
remote_heads = git.cmd.Git().ls_remote('https://github.com/mailcow/mailcow-dockerized', tags=True)
tags = remote_heads.splitlines()
for x in tags:
string = x.rsplit('/',2)
if remoteData['version'] != string[2]:
versionInfo = 'Your Mailcow version is %s and the latest is %s' %(remoteData['version'], string[2])
else:
versionInfo = 'You have the latest Mailcow version %s' %remoteData['version']
return (versionInfo)
def show_current_info():
API = get_api()
mail_server = get_settings('mail_server')
latest_release = get_latest_release()
if API == 'DUMMY_KEY':
API = 'Missing API Key!'
if mail_server == 'dummy.server':
mail_server = 'Missing address to mailcow instance!'
aliases_server = number_of_aliases_on_server()
alias_db = number_of_aliases_in_db()
mailcow_version = get_mailcow_version()
mail_domains = get_mail_domains(False)
domain = ""
i=0
for domains in mail_domains:
if i!=0:
domain = domain + ', ' + str(mail_domains[i]['domain_name'])
else:
domain = domain + str(mail_domains[i]['domain_name'])
i+=1
print('\n[b]malias[/b] - Manage aliases on Mailcow instance.')
print('===================================================')
print("API key\t\t\t: [b]%s[/b]" % (API))
print("Mailcow Instance\t: [b]%s[/b]" % (mail_server))
print("Active domains\t\t: [b]%s[/b]" % (domain))
print("Mailcow version\t\t: [b]%s[/b]" % (mailcow_version))
print("Logfile\t\t\t: [b]%s[/b]" % (logfile))
print("Databse\t\t\t: [b]%s[b]" % (database))
print("Aliases on server\t: [b]%s[/b]" % (aliases_server))
print("Aliases in DB\t\t: [b]%s[/b]" % (alias_db))
print("")
if app_version[:3] != latest_release:
print(
"App version\t\t\t\t: [b]%s[/b] a new version (%s) is available @ https://iurl.no/malias"
% (app_version, latest_release)
)
else:
print("App version\t\t: [b]%s[/b]" % (app_version))
print('')
def export_data():
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.dumps(json.loads(remote),indent=4)
with open("alias.json", "w") as outfile:
outfile.write(remoteData)
def import_data(file):
apikey=get_api()
mailserver = get_settings('mail_server')
active_domains = get_mail_domains(False)
with open(file,'r') as mail_alias:
json_object = json.load(mail_alias)
domain_list = []
active_domain_list = []
for data in json_object:
domain = data['domain']
if domain not in domain_list:
domain_list.append(domain)
for data in active_domains:
active_domain_list.append(data['domain_name'])
diff = list(set(domain_list) - set(active_domain_list))
if len(diff) != 0:
missing = ', '.join(diff)
print ('Please add the domains %s to your mailcow instance' % (missing))
sys.exit(1)
else:
print('OK')
def updatedb():
# Function for updatimg DB when we have to
# 09.04.24
# Added DB tempalias table
# Added DB version table
cursor = conn.cursor()
cursor.execute('''CREATE TABLE IF NOT EXISTS dbversion
(version integer NOT NULL DEFAULT 0)''')
cursor.execute('SELECT COUNT(version) FROM dbversion')
count = cursor.fetchone()[0]
if count == 0:
cursor.execute('INSERT INTO dbversion values(?)', (db_version,))
conn.execute('''CREATE TABLE IF NOT EXISTS timedaliases
(id integer NOT NULL PRIMARY KEY,
alias text NOT NULL,
goto text NOT NULL,
validity text NOT NULL)''')
conn.commit()
conn = connect_database()
updatedb() # Check if db needs updating and do the updating.
parser = argparse.ArgumentParser(prog='malias',
description='This is a simple application to help you create and delete aliases on a mailcow instance.\nIf you find any issues or would like to submit a PR - please head over to https://gitlab.pm/rune/malias. \n\nI hope this makes your mailcow life a bit easier!',
formatter_class=RawTextHelpFormatter,
epilog='Making mailcow easier...')
parser.add_argument('-k', '--api', help='Add/Change API key.\n\n',
nargs=1, metavar=('APIkey'), required=False, action="append")
parser.add_argument('-s', '--search', help='Search for alias.\n\n',
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
parser.add_argument('-m', '--server', help='Add/Uppdate mailcow instance.\n\n',
nargs=1, metavar=('mailcow-server.tld'), required=False, action="append")
parser.add_argument('-a', '--add', help='Add new alias.\n\n',
nargs=2, metavar=('alias@domain.com', 'to@domain.com'), required=False, action="append")
parser.add_argument('-t', '--timed', help='Add new time limited alias for user on domain. One year validity\n\n',
nargs=2, metavar=('user@domain.com', 'domain.com'), required=False, action="append")
parser.add_argument('-d', '--delete', help='Delete alias.\n\n',
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
# parser.add_argument('-i', '--import', help='Show current config and appliacation info\n\n',
# nargs=1, metavar=('alias.json'), required=False, action="append")
parser.add_argument('-v', '--version', help='Show current version and information\n\n',
required=False, action='store_true')
parser.add_argument('-c', '--copy', help='Copy alias data from mailcow server to local DB.\n\n',
required=False, action='store_true')
parser.add_argument('-l', '--list', help='List all aliases on the Mailcow instance.\n\n',
required=False, action='store_true')
parser.add_argument('-o', '--domains', help='List all mail domains on the Mailcow instance.\n\n',
required=False, action='store_true')
args = vars(parser.parse_args())
if args['api']:
apikey(args['api'][0][0])
elif args['search']:
search(args['search'][0][0])
elif args['server']:
set_mailserver(args['server'][0][0])
elif args['add']:
create(args['add'][0][0],args['add'][0][1])
elif args['timed']:
create_timed(args['timed'][0][0],args['timed'][0][1])
elif args['delete']:
delete_alias(args['delete'][0][0])
elif args['import']:
import_data(args['import'][0][0])
elif args['version']:
show_current_info()
elif args['copy']:
copy_data()
elif args['list']:
list_alias()
elif args['domains']:
get_mail_domains(True)
else:
print('\n\nEh, sorry! I need something more to help you! If you write [b]malias -h[/b] I\'ll show a help screen to get you going!!!\n\n\n')