malias/malias.py

520 lines
18 KiB
Python
Executable File

#!/usr/bin/python3
import sqlite3
from pathlib import Path
from sqlite3 import Error
import urllib.request
import json
import logging
import argparse
import requests
import os
import time
import sys
import git
from types import SimpleNamespace
from datetime import datetime
from string import ascii_letters, digits
from rich import print
from argparse import RawTextHelpFormatter
from urllib.request import urlopen
# Info pages for dev
# https://mailcow.docs.apiary.io/#reference/aliases/get-aliases/get-aliases
# https://demo.mailcow.email/api/#/Aliases
homefilepath = Path.home()
filepath = homefilepath.joinpath('.config/malias')
database = filepath.joinpath('malias.db')
logfile = filepath.joinpath('malias.log')
Path(filepath).mkdir(parents=True, exist_ok=True)
logging.basicConfig(filename=logfile,level=logging.INFO,format='%(message)s')
app_version = '0.2.4'
def connect_database():
Path(filepath).mkdir(parents=True, exist_ok=True)
conn = None
try:
conn = sqlite3.connect(database)
except Error as e:
logging.error(time.strftime("%Y-%m-%d %H:%M") + ' - Error : ' + str(e))
print(e)
finally:
if conn:
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS apikey
(id integer NOT NULL PRIMARY KEY,
api text NOT NULL)''')
c.execute('''CREATE TABLE IF NOT EXISTS settings (
id INTEGER NOT NULL PRIMARY KEY,
first_run INTEGER,
server TEXT,
data_copy INTEGER
)''')
c.execute('''CREATE TABLE IF NOT EXISTS aliases
(id integer NOT NULL PRIMARY KEY,
alias text NOT NULL,
goto text NOT NULL,
created text NOT NULL)''')
conn.commit()
first_run(conn)
return conn
def first_run(conn):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM settings')
count = cursor.fetchone()[0]
if count == 0:
logging.error(now + ' - First run!')
cursor.execute('INSERT INTO settings values(?,?,?,?)', (1, 1, 'dummy.server',0))
cursor.execute('INSERT INTO apikey values(?,?)', (1, 'DUMMY_KEY'))
conn.commit()
return None
def get_settings(kind):
cursor = conn.cursor()
cursor.execute('SELECT * FROM settings')
data = cursor.fetchall()
first_run_status = data[0][1]
mail_server = data[0][2]
copy_status = data[0][3]
if kind == 'mail_server':
if mail_server == 'dummy.server':
print('Error: No mailcow server active. Please add one with [b]malias -m [i]your.server[/i][/b]')
exit(0)
else:
return mail_server
if kind == 'first_run_status':
return first_run_status
if kind == 'copy_status':
return copy_status
def get_api():
cursor = conn.cursor()
cursor.execute('SELECT api FROM apikey')
apikey = cursor.fetchone()[0]
if apikey == 'DUMMY_KEY':
print('Missing API key. Please add with [b]malias -k [i]YOUR-API-KEY[/i][/b]')
exit(0)
else:
return apikey
def set_mailserver(server):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('UPDATE settings SET server = ? WHERE id = 1',(server,))
logging.info(now + ' - Info : mailcow server updated')
print('Your mail server has been updated.')
conn.commit()
def apikey(key):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('UPDATE apikey SET api = ? WHERE id = 1',(key,))
logging.info(now + ' - Info : API key updated')
print('Your API key has been updated.')
conn.commit()
def get_mail_domains():
apikey = get_api()
cursor = conn.cursor()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/domain/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
total_aliases = 0
i=0
print('\n[b]malias[/b] - All email domains on %s' %(mail_server))
print('==================================================================')
for domains in remoteData:
cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', ('%'+remoteData[i]['domain_name']+'%','%'+remoteData[i]['domain_name']+'%',))
count = cursor.fetchone()[0]
total_aliases += count
print('%s \t\twith %s aliases' %(remoteData[i]['domain_name'],count))
i+=1
print('\n\nThere is a total of %s domains with %s aliases.' %(str(i),str(total_aliases)))
def create(alias,to_address):
now = datetime.now().strftime("%m-%d-%Y %H:%M")
server = get_settings('mail_server')
apikey = get_api()
if checklist(alias) == True:
logging.error(now + ' - Error : alias %s exists on the mailcow instance %s ' %(alias,server))
print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server))
exit(0)
else:
data = {'address': alias,'goto': to_address,'active': "1"}
headers = {'X-API-Key': apikey, "Content-Type": "application/json"}
response = requests.post('https://'+server+'/api/v1/add/alias',
data=json.dumps(data), headers=headers)
mail_id = alias_id(alias)
if mail_id == None:
logging.error(now + ' - Error : alias %s not created on the mailcow instance %s ' %(alias,server))
print('\n[b]Error[/b] : alias %s exists on the mailcow instance %s \n' %(alias,server))
else:
cursor = conn.cursor()
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (mail_id, alias,to_address,now))
conn.commit()
logging.info(now + ' - Info : alias %s created for %s on the mailcow instance %s ' %(alias,to_address,server))
print('\n[b]Info[/b] : alias %s created for %s on the mailcow instance %s \n' %(alias,to_address,server))
def delete_alias(alias):
server = get_settings('mail_server')
apikey = get_api()
if checklist(alias) == True:
the_alias_id = alias_id(alias)
data = {'id': the_alias_id}
headers = {'X-API-Key': apikey, "Content-Type": "application/json"}
response = requests.post('https://'+server+'/api/v1/delete/alias',
data=json.dumps(data), headers=headers)
response_data = response.json()
if response_data[0]['type'] == 'success':
if check_local_db(the_alias_id) == 1:
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
cursor.execute('DELETE from aliases where id = ?',(the_alias_id,))
logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s and Local DB' %(alias,server))
conn.commit()
print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s and local DB' %(alias,server))
else:
logging.info(now + ' - Info : alias %s deleted from the mailcow instance %s.' %(alias,server))
print('\n[b]Info[/b] : alias %s deleted from the mailcow instance %s.' %(alias,server))
else:
logging.error(now + ' - Error : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data))
conn.commit()
print('\n[b]Error[/b] : alias %s NOT deleted from the mailcow instance %s. Error : %s' %(alias,server,response_data))
else:
print('\n[b]Error[/b] : The alias %s not found')
def copy_data():
apikey = get_api()
mail_server = get_settings('mail_server')
if get_settings('copy_status') == 0:
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
for data in remoteData:
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (remoteData[i]['id'], remoteData[i]['address'],remoteData[i]['goto'],now))
i=i+1
cursor.execute('UPDATE settings SET data_copy = ? WHERE id = 1',(1,))
conn.commit()
logging.info(now + ' - Info : aliases imported from the mailcow instance %s to local DB' %(mail_server))
print('\n[b]Info[/b] : aliases imported from the mailcow instance %s to local DB\n' %(mail_server))
else:
print('\n[b]Info[/b] : aliases alreday imported from the mailcow instance %s to local DB\n\n[i]Updating with any missing aliases![/i]' %(mail_server))
update_data()
def update_data():
apikey = get_api()
mail_server = get_settings('mail_server')
if get_settings('copy_status') == 1:
now = datetime.now().strftime("%m-%d-%Y %H:%M")
cursor = conn.cursor()
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
count_alias = 0
cursor = conn.cursor()
for data in remoteData:
cursor.execute('SELECT count(*) FROM aliases where alias like ? and goto like ?', (remoteData[i]['address'],remoteData[i]['goto'],))
count = cursor.fetchone()[0]
if count >= 1 :
i+=1
else:
cursor.execute('INSERT INTO aliases values(?,?,?,?)', (remoteData[i]['id'], remoteData[i]['address'],remoteData[i]['goto'],now))
count_alias+=1
i+=1
conn.commit()
if count_alias > 0:
logging.info(now + ' - Info : Local DB updated with %s new aliases from %s ' %(str(count_alias),mail_server))
print('\n[b]Info[/b] : Local DB update with %s new aliases from %s \n' %(str(count_alias),mail_server))
else:
print('\n[b]Info[/b] : No missing aliases from local DB \n')
def checklist(alias):
alias_exist = None
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
for search in remoteData:
if alias in remoteData[i]['address'] or alias in remoteData[i]['goto']:
alias_exist = True
i=i+1
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', (alias,alias,))
count = cursor.fetchone()[0]
if count >= 1 :
alias_exist = True
return alias_exist
def list_alias():
apikey = get_api()
cursor = conn.cursor()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
l = 0
print('\n[b]malias[/b] - All aliases on %s ([b]*[/b] also in local db)' %(mail_server))
print('==================================================================')
for search in remoteData:
the_alias = remoteData[i]['address'].ljust(20,' ')
the_goto = remoteData[i]['goto'].ljust(20,' ')
cursor.execute('SELECT count(*) FROM aliases where alias like ? or goto like ?', (remoteData[i]['address'],remoteData[i]['address'],))
count = cursor.fetchone()[0]
if count >= 1:
print(the_alias + '\tgoes to\t\t' + the_goto + '\t[b]*[/b]')
l=l+1
else:
print(the_alias + '\tgoes to\t\t' + the_goto)
i=i+1
print('\n\nTotal number of aliases %s on instance [b]%s[/b] and %s on [b]local DB[/b].' %(str(i),mail_server,str(l)))
def alias_id(alias):
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
for search in remoteData:
if remoteData[i]['address'] == alias:
return remoteData[i]['id']
i=i+1
return None
def number_of_aliases_in_db():
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM aliases')
count = cursor.fetchone()[0]
return count
def number_of_aliases_on_server():
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
return len(remoteData)
def check_local_db(alias_id):
cursor = conn.cursor()
cursor.execute('SELECT count(*) FROM aliases where id = ?',(alias_id,))
count = cursor.fetchone()[0]
return count
def search(alias):
apikey = get_api()
mail_server = get_settings('mail_server')
cursor = conn.cursor()
cursor.execute('SELECT data_copy FROM settings where id = 1')
result = cursor.fetchone()[0]
if result == 1:
search_term = '%'+alias+'%'
cursor.execute('SELECT * from aliases where alias like ? or goto like ?',(search_term,search_term,))
remoteData = cursor.fetchall()
i = 0
print('\nAliases on %s that contains %s' %(mail_server,alias))
print('=================================================================')
for search in remoteData:
the_alias = remoteData[i][1].ljust(20,' ')
print(the_alias + '\tgoes to\t\t' + remoteData[i][2])
i=i+1
print('\n\nData from local DB')
else:
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/alias/all')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
i = 0
print('\nAliases on %s that contains %s' %(mail_server,alias))
print('=================================================')
for search in remoteData:
if alias in remoteData[i]['address'] or alias in remoteData[i]['goto']:
print(remoteData[i]['address'] + '\tgoes to\t\t' + remoteData[i]['goto'])
i=i+1
print('\n\nData from server')
def get_mailcow_version():
apikey = get_api()
mail_server = get_settings('mail_server')
req = urllib.request.Request('https://'+mail_server+'/api/v1/get/status/version')
req.add_header('Content-Type', 'application/json')
req.add_header('X-API-Key', apikey)
current = urllib.request.urlopen(req)
remote = current.read().decode('utf-8')
remoteData = json.loads(remote)
#repo = Repo("https://github.com/mailcow/mailcow-dockerized")
# info = repo.git.ls_remote()
# tags = repo.tags
remote_heads = git.cmd.Git().ls_remote('https://github.com/mailcow/mailcow-dockerized', tags=True)
# tags = remote_heads.rstrip('refs/tags/')
tags = remote_heads.splitlines()
for x in tags:
string = x.rsplit('/',2)
if remoteData['version'] != string[2]:
versionInfo = 'Your Mailcow version is %s and the latest is %s' %(remoteData['version'], string[2])
else:
versionInfo = 'You have the latest Mailcow version %s' %remoteData['version']
return (versionInfo)
def show_current_info():
API = get_api()
mail_server = get_settings('mail_server')
if API == 'DUMMY_KEY':
API = 'Missing API Key!'
if mail_server == 'dummy.server':
mail_server = 'Missing address to mailcow instance!'
aliases_server = number_of_aliases_on_server()
alias_db = number_of_aliases_in_db()
mailcow_version = get_mailcow_version()
print('\n[b]malias[/b] - Manage aliases on mailcow Instance.')
print('===================================================')
print('API key : [b]%s[/b]' % (API))
print('mailcow Instance : [b]%s[/b]' % (mail_server))
print('Mailcow version : [b]%s[/b]' % (mailcow_version))
print('Logfile : [b]%s[/b]' % (logfile))
print('Databse : [b]%s[b]' % (database))
print('Aliases on server : [b]%s[/b]' % (aliases_server))
print('Aliases in DB : [b]%s[/b]' % (alias_db))
print('')
print('App version : [b]%s[/b] (https://iurl.no/malias)' % (app_version))
print('')
conn = connect_database()
parser = argparse.ArgumentParser(prog='malias',
description='This is a simple application to help you create and delete aliases on a mailcow instance.\nIf you find any issues or would like to submit a PR - please head over to https://gitlab.pm/rune/malias. \n\nI hope this makes your mailcow life a bit easier!',
formatter_class=RawTextHelpFormatter,
epilog='Making mailcow easier...')
parser.add_argument('-k', '--api', help='Add/Change API key.\n\n',
nargs=1, metavar=('APIkey'), required=False, action="append")
parser.add_argument('-s', '--search', help='Search for alias.\n\n',
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
parser.add_argument('-m', '--server', help='Add/Uppdate mailcow instance.\n\n',
nargs=1, metavar=('mailcow-server.tld'), required=False, action="append")
parser.add_argument('-a', '--add', help='Add new alias.\n\n',
nargs=2, metavar=('alias@domain.com', 'to@domain.com'), required=False, action="append")
parser.add_argument('-d', '--delete', help='Delete alias.\n\n',
nargs=1, metavar=('alias@domain.com'), required=False, action="append")
parser.add_argument('-i', '--info', help='Show current version and config info\n\n',
required=False, action='store_true')
parser.add_argument('-c', '--copy', help='Copy alias data from mailcow server to local DB.\n\n',
required=False, action='store_true')
parser.add_argument('-l', '--list', help='List all aliases on the Mailcow instance.\n\n',
required=False, action='store_true')
parser.add_argument('-o', '--domains', help='List all mail domains on the Mailcow instance.\n\n',
required=False, action='store_true')
args = vars(parser.parse_args())
if args['api']:
apikey(args['api'][0][0])
elif args['search']:
search(args['search'][0][0])
elif args['server']:
set_mailserver(args['server'][0][0])
elif args['add']:
create(args['add'][0][0],args['add'][0][1])
elif args['delete']:
delete_alias(args['delete'][0][0])
elif args['info']:
show_current_info()
elif args['copy']:
copy_data()
elif args['list']:
list_alias()
elif args['domains']:
get_mail_domains()
else:
#get_mailcow_version()
print('\n\nEh, sorry! I need something more to help you! If you write [b]malias -h[/b] I\'ll show a help screen to get you going!!!\n\n\n')