jeudi 1 avril 2021

What design pattern fits managing Active Directory Security Groups via the ldap3 library with python?

Issue

I wrote a script a while back that creates AD security groups. It triggers automatically whenever we create a new service, it creates AD groups based on a template. This works great, however I quickly realized a new iteration is required which can delete and update the groups based on the changes made in the template.

I can add additional methods and make some slight changes to address this, but since the codebase is currently still minuscule I figured this would be a good opportunity to start practicing patterns if possible. What approach/pattern should I use when refactoring and why?

Current Code

#!/usr/bin/env python3
"""Creates the Active Directory groups required by the ***** that authenticate via *****.

Attributes;
    KWARGS (dict): Keyword arguments utilized with the ldap3.Connection
        class.
    LOGGER (logging.Logger): A logging channel.
    SERVER (ldap3.Server): Object representation of the LDAP server.
    SGS (list): A list of groups to be created.
"""
import logging
import os
from sys import stdout

import boto3
import yaml
from ldap3 import Connection, NTLM, Server
from ldap3.extend.microsoft.addMembersToGroups import ad_add_members_to_groups
from ldap3.core.exceptions import LDAPEntryAlreadyExistsResult, LDAPNoSuchObjectResult

logging.basicConfig(stream=stdout)

LOGGER = logging.getLogger(__name__)
LOGGER.setLevel(logging.INFO)

SERVER = Server(os.environ['LDAP_SERVER'], use_ssl=True)

with open('templates/security_groups.yaml') as fil:
    SGS = yaml.load(fil, Loader=yaml.FullLoader)

def create_security_group(name: str = '', desc: str = '',
                          membership: list = None,
                          domain_local: bool = False) -> None:
    """Creates an active directory group.

    Args:
        name: The name of the active directory group.
        desc: The description of the group.
        membership: The groups to add the group to.
        domain_local: Creates a domain local security group if True,
            otherwise a global security group is generated.
    """
    attribs = {
        'cn': name,
        'description': desc,
        'groupType': '-2147483646', # Global Security Group
        'sAMAccountName': name,
    }
    if domain_local:
        attribs['groupType'] = '-2147483644' # Domain local group
    with Connection(SERVER, **KWARGS) as conn:
        LOGGER.info('Creating %s.', name)
        try:
            conn.add(
                get_distinguished_name(name, ou_='OU WHERE SIMILAR GROUPS ARE STORED AND MANAGED'),
                object_class='Group',
                attributes=attribs
            )
        except LDAPEntryAlreadyExistsResult:
            LOGGER.info('%s already exists.', name)
        if membership:
            LOGGER.info('Adding %s to the following groups: %s', name, membership)
            try:
                ad_add_members_to_groups(
                    conn,
                    [get_distinguished_name(member, ou_='MOCK') for member in membership],
                    [get_distinguished_name(name, ou_='OU WHERE SIMILAR GROUPS ARE STORED AND MANAGED')],
                )
            except LDAPNoSuchObjectResult:
                LOGGER.info('Please check if the following groups exists in MOCK:\n\n%s',
                            membership)

def get_connection_args() -> dict:
    """Returns the ldap connection arguments.

    Returns:
        The ldap connection arguments.
    """
    kwargs = {
        'authentication': NTLM,
        'raise_exceptions': True,
    }
    return kwargs

def get_distinguished_name(name: str, ou_: str = '') -> str:
    """Returns the AD distinguished name.

    Args:
        name: The AD common name.
        ou_: The organizational unit to add to the search base.
            Ex: 'MOCK'

    Returns:
        The distinguished name.
    """
    return f'cn={name},'+get_ldap_base(ou_=ou_)

def get_ldap_base(ou_: str = '') -> str:
    """Returns the LDAP search base.

    Args:
        ou_: The organizational unit to add to the search base.
            Ex: 'MOCK'

    Returns:
        The LDAP search base.
    """

    dc_ = 'dc=example,dc=com'
    ous = ou_.split('/')
    ous.reverse()
    _ous = []
    for _ou in ous:
        _ous.append(f'ou={_ou}')
    return ','.join(_ous)+f',{dc_}'

def get_lob_security_groups(team: str = '', owners: str = '', tier: str = '') -> list:
    """Gets the teams associated with the
    account.

    Args:
        team: The initials of the team.
        owners: The IDs associated with the primary and secondary owner of the
            AD group.
        tier: The tier of the service. Ex: sandbox

    Returns
        The AD groups to create.
    """
    owners_ = []
    for owner in owners.upper().split():
        owners_.append(f'{owner} '+get_name_from_id(owner))

    sgs = []
    for role in ['developer', 'readonly', 'sandbox_developer']:
        if tier.lower() != 'development' and role == 'developer':
            continue
        if tier.lower() != 'sandbox' and role == 'sandbox_developer':
            continue
        desc = f'Grants the {team.upper()} {role} access to the service.'
        if role == 'developer':
            desc = f'****: {desc}'
        else:
            desc = ', '.join(owners_)+f': {desc}.'
        sgs.append({
            'name': '{service}-'+team.lower()+'-{shortname}-'+role,
            'desc': desc,
            'membership': [],
        })
    return sgs

def get_name_from_id(id_: str) -> str:
    """Translates the Active Directory user ID to a first and last name.

    Args:
        id_: The Active Directory user ID.

    Returns:
        The first and last name of the user.
    """
    with Connection(SERVER, **KWARGS) as conn:

        base = get_ldap_base(ou_='MOCK')
        filt = f'(&(objectclass=person)(cn={id_.upper()}))'
        conn.search(base, filt, attributes=['givenName', 'sn'])
        for entry in conn.entries:
            return f'{entry["givenName"]} {entry["sn"]}'
    return ''

def main() -> None:
    """Creates the Active Directory groups required by the *****
    that authenticate via *****.
    """
    SGS.extend(get_lob_security_groups(
        team=os.environ['TEAM'],
        owners=os.environ['OWNERS'],
        tier=os.environ['TIER'])
    )
    for sg_ in SGS:
        if os.environ['TIER'].lower() != 'development' \
            and sg_['name'].endswith('****'):
            continue
        if any(['developer' in sg_['name'], 'readonly' in sg_['name']]):
            create_security_group(
                name=sg_['name'].format(service=os.environ['SERVICE'],
                                        shortname=os.environ['SHORTNAME']),
                desc=sg_['desc'],
                membership=sg_['membership'],
                domain_local=True
            )
        else:
            create_security_group(
                name=sg_['name'].format(service=os.environ['SERVICE'],
                                        shortname=os.environ['SHORTNAME']),
                desc=sg_['desc'],
                membership=sg_['membership']
            )

if __name__ == '__main__':
    KWARGS = get_connection_args()
    main()

Aucun commentaire:

Enregistrer un commentaire