source: dassldapsync/dassldapsync.py

Last change on this file was 1221, checked in by joergs, 5 years ago

added syncrepl

  • Property svn:executable set to *
File size: 23.2 KB
Line 
1#!/usr/bin/python
2# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
3
4# core modules
5import argparse
6import ConfigParser
7import logging
8from   pprint import pprint
9import signal
10import subprocess
11import sys
12import time
13
14# external modules
15import datetime
16import dateutil.parser
17import dateutil.tz
18import ldap
19from   ldap.ldapobject import ReconnectLDAPObject
20import ldap.modlist
21from   ldap.syncrepl import SyncreplConsumer
22import ldapurl
23import ldif
24
25
26
27def getArguments():
28    configfile = '/etc/dassldapsync.conf'
29    parser = argparse.ArgumentParser(description='Synchronize the content of two LDAP servers.')
30    parser.add_argument('-d', '--debug', action='store_true', help="enable debug output")
31    parser.add_argument('configfile', default=configfile,
32                        help="Configuration file [default: {}]".format(configfile))
33    return parser.parse_args()
34
35
36class Options(object):
37    def __init__(self):
38        self.delete = True
39        self.starttls = False
40        self.updateonly = False
41        self.filter = None
42        self.attrlist = None
43        self.exclude = None
44        self.renameattr = None
45        self.renamecommand = None
46        self.pwd_max_days = 0
47
48class ConfigParserDefaults(ConfigParser.ConfigParser, object):
49    def get(self, section, option, default=None):
50        try:
51            result = super(self.__class__, self).get(section, option)
52        except ConfigParser.NoOptionError:
53            if default is None:
54                raise
55            else:
56                result = default
57        return result
58
59    def get_section(self, section):
60        if section in self._sections:
61            return self._sections[section]
62
63    def get0(self, section, option, default=None):
64        try:
65            result = super(self.__class__, self).get(section, option)
66        except ConfigParser.NoOptionError:
67            result = default
68        return result
69
70    def getboolean(self, section, option, default=None):
71        try:
72            result = super(self.__class__, self).getboolean(section, option)
73        except ConfigParser.NoOptionError:
74            if default is None:
75                raise
76            else:
77                result = default
78        return result
79
80    def get_ldap_url_obj(self, section):
81        baseurl = 'ldap://{server}:389/{basedn}'.format(**(self.get_section(section)))
82        attrs = None
83        if self.get0(section, 'attributes') is not None:
84            attrs = self.get(section, 'attributes').split(',')
85        return ldapurl.LDAPUrl(
86            baseurl,
87            dn=self.get(section, 'baseDn', ''),
88            who=self.get0(section, 'bindDn'),
89            cred=self.get0(section, 'basePassword'),
90            filterstr=self.get0(section, 'filter'),
91            attrs=attrs
92        )
93
94
95def readLDIFSource(path):
96    logger = logging.getLogger()
97    logger.info("reading LDAP objects from file {}".format(path))
98    with open(path, 'r') as f:
99        parser = ldif.LDIFRecordList(f)
100        parser.parse()
101        result = parser.all_records
102    return result
103
104def readLdapSource(server, binddn, bindpw, basedn, filterstr, attrlist=None, starttls=False):
105    logger = logging.getLogger()
106    logger.info("reading LDAP objects from server {}".format(server))
107    con = ldap.open(server, port=389)
108    if starttls:
109        con.start_tls_s()
110    con.simple_bind_s(binddn, bindpw)
111    results = con.search_s(basedn, ldap.SCOPE_SUBTREE, filterstr, attrlist)
112    return results
113
114class LdapSync(object):
115    def __init__(self, destserver,
116                 destbinddn, destbindpw,
117                 srcbasedn, destbasedn, options=Options()):
118        self.logger = logging.getLogger()
119
120        self.destserver = destserver
121        self.destbasedn = destbasedn
122        self.destbinddn = destbinddn
123        self.destbindpw = destbindpw
124        self.options = options
125
126        self.srcbasedn = srcbasedn
127
128        self.con = None
129
130        self.attrmap = ldap.cidict.cidict({})
131        self.classmap = {}
132
133        self.junk_attrs = ["memberof", "modifiersname", "modifytimestamp", "entryuuid",
134                           "entrycsn", "contextcsn", "creatorsname", "createtimestamp",
135                           "structuralobjectclass", "pwdchangedtime", "pwdfailuretime"]
136
137        self.reset_result()
138
139
140    def reset_result(self):
141        self.result = {
142            'add': {'ok': [], 'failed': []},
143            'update': {'ok': [], 'failed': []},
144            'delete': {'ok': [], 'failed': []},
145        }
146
147
148    def _dest_ldap_connect(self):
149        if self.con is None:
150            self.logger.info("connect to destination LDAP server {}".format(self.destserver))
151            self.con = ldap.open(self.destserver, port=389)
152            if self.options.starttls:
153                self.con.start_tls_s()
154            self.con.simple_bind_s(self.destbinddn, self.destbindpw)
155
156    def __adapt_dn(self, dn):
157        # move LDAP object to dest base
158        if self.srcbasedn != self.destbasedn:
159            dn_old = dn
160            rpath = dn[:-len(self.srcbasedn)]
161            dn = rpath+self.destbasedn
162            self.logger.debug("moved {} to {}".format(dn_old, dn))
163            # print "dn:",dn,"src:",srcbasedn,"rpath:",rpath,"dest:",destbasedn
164        return dn
165
166    def __is_dn_included(self, dn):
167        if self.options.exclude is None:
168            return True
169        if dn.lower().endswith(self.options.exclude):
170            return False
171        return True
172
173    def __adapt_source_ldap_objects(self, searchresult):
174        """
175        Do configured modification to the source LDAP objects.
176        """
177        self.logger.debug("modifying LDAP objects retrieved from source LDAP")
178
179        update_objects = []
180
181        for r in searchresult:
182            dn = self.__adapt_dn(r[0])
183            d = ldap.cidict.cidict(r[1])
184
185            if self.__is_dn_included(dn):
186                objectclasses = d["objectclass"]
187
188                newObjectclasses = []
189                for o in objectclasses:
190                    if o.lower() in self.classmap:
191                        new_oc = self.classmap[o.lower()]
192                        if new_oc not in newObjectclasses:
193                            newObjectclasses.append(new_oc)
194                    else:
195                        if o not in newObjectclasses:
196                            newObjectclasses.append(o)
197
198                d["objectclass"] = newObjectclasses
199
200                for a in d.keys():
201                    attr = a
202                    if self.attrmap.has_key(a.lower()):
203                        attr = self.attrmap[attr].lower()
204                        if attr.lower() != a.lower():
205                            values = d[a]
206                            del d[a]
207                            d[attr] = values
208
209                update_objects.append((dn, d))
210        return update_objects
211
212
213    def _get_dest_entry(self, dn, entry):
214        """
215        In the destination LDAP, the objects should be named
216        according to options.renameattr.
217        """
218        attrlist = self.options.attrlist
219
220        existingDestDn = None
221        existingDestEntry = None
222        if self.options.renameattr and entry.has_key(self.options.renameattr):
223            searchresult = self.con.search_s(
224                self.destbasedn,
225                ldap.SCOPE_SUBTREE,
226                '%s=%s' % (self.options.renameattr, entry[self.options.renameattr][0]), attrlist)
227            if searchresult is not None and len(searchresult) > 0:
228                existingDestDn, existingDestEntry = searchresult[0]
229                if existingDestDn.lower() != dn.lower():
230                    self.con.modrdn_s(existingDestDn, dn)
231                    self.notify_renamed(existingDestDn, dn,
232                                        existingDestEntry[self.options.renameattr][0],
233                                        entry[self.options.renameattr][0],
234                                        options)
235        if existingDestDn is None:
236            searchresult = self.con.search_s(dn, ldap.SCOPE_BASE, 'objectclass=*', attrlist)
237            existingDestDn, existingDestEntry = searchresult[0]
238        return (existingDestDn, existingDestEntry)
239
240
241    def __handle_pwdAccountLockedTime(self, dn, entry, now, max_age):
242        # hack for syncing accounts locked by password policy
243        do_unlock = False
244        if self.options.pwd_max_days > 0 and entry.has_key('pwdChangedTime'):
245            # print "pwdChangedTime set for",dn
246            pwdChange = entry['pwdChangedTime'][0]
247            d = dateutil.parser.parse(pwdChange)
248            if (now-d) > max_age:
249                entry['pwdAccountLockedTime'] = ['000001010000Z']
250                self.logger.info("locking {} {}".format(dn, pwdChange))
251            else:
252                # pwdAccountLockedTime is a operational attribute,
253                # and therefore not part of entry.
254                # Do extra search to retrieve attribute.
255                searchresult = self.con.search_s(
256                    dn, ldap.SCOPE_BASE,
257                    "objectclass=*", attrlist=['pwdAccountLockedTime'])
258                tmp_dn, tmp_entry = searchresult[0]
259                if tmp_entry.has_key('pwdAccountLockedTime'):
260                    do_unlock = True
261        return do_unlock
262
263
264    def _syncLdapObject(self, srcDn, srcAttributes):
265        tzutc = dateutil.tz.gettz('UTC')
266        now = datetime.datetime.now(tzutc)
267        max_age = datetime.timedelta(days=self.options.pwd_max_days)
268
269        try:
270            destDn, destAttributes = self._get_dest_entry(srcDn, srcAttributes)
271
272            # hack for syncing accounts locked by password policy
273            do_unlock = self.__handle_pwdAccountLockedTime(srcDn, srcAttributes, now, max_age)
274
275            mod_attrs = ldap.modlist.modifyModlist(destAttributes, srcAttributes)
276
277            # hack for unlocking, see above
278            if do_unlock:
279                self.logger.info("unlocking {} {}".format(destDn, 'pwdAccountLockedTime'))
280                mod_attrs.append((ldap.MOD_DELETE, 'pwdAccountLockedTime', None))
281
282            if self.options.attrlist is not None:
283                mod_attrs = [a for a in mod_attrs if a[1].lower() in self.options.attrlist]
284
285            if self.junk_attrs is not None:
286                mod_attrs = [a for a in mod_attrs if a[1].lower() not in self.junk_attrs]
287
288            if mod_attrs:
289                try:
290                    self.logger.debug('mod_attrs: ' + str(mod_attrs))
291                    self.con.modify_s(srcDn, mod_attrs)
292                    self.notify_modified(srcDn)
293                except:
294                    self.logger.exception('modify failed')
295                    self.notify_modified(srcDn, False)
296
297        except ldap.NO_SUCH_OBJECT:
298            if not self.options.updateonly:
299                try:
300                    self.con.add_s(srcDn, ldap.modlist.addModlist(srcAttributes, self.junk_attrs))
301                    self.notify_created(srcDn)
302                except (ldap.OBJECT_CLASS_VIOLATION,
303                        ldap.NO_SUCH_OBJECT,
304                        ldap.CONSTRAINT_VIOLATION):
305                    self.notify_created(srcDn, False)
306
307
308    def __syncLdapDestination(self, update_objects):
309
310        logger.debug("writing data to destination LDAP")
311        for obj in update_objects:
312            dn, entry = obj
313            self._syncLdapObject(dn, entry)
314
315
316    def __deleteDestLdapObjects(self, update_objects):
317        """
318        Remove all LDAP objects in destination LDAP server
319        that did not come from the source LDAP objects
320        and are not excluded.
321        """
322
323        searchresult = self.con.search_s(self.destbasedn, ldap.SCOPE_SUBTREE, self.options.filter)
324        existing = [x[0].lower() for x in searchresult]
325
326        morituri = existing
327
328        if self.destbasedn.lower() in existing:
329            morituri.remove(self.destbasedn.lower())
330
331        for obj in update_objects:
332            dn, entry = obj
333            if dn.lower() in existing:
334                morituri.remove(dn.lower())
335        for dn in morituri:
336            if self.__is_dn_included(dn):
337                try:
338                    self.con.delete_s(dn)
339                    self.notify_deleted(dn)
340                except:
341                    self.notify_deleted(dn, False)
342
343
344    def sync(self, searchresult):
345        """
346        Synchronize entries from searchresult to destination LDAP server.
347        """
348        if len(searchresult) == 0:
349            self.logger.error("empty source, aborting")
350            return
351
352        self._dest_ldap_connect()
353
354        update_objects = self.__adapt_source_ldap_objects(searchresult)
355        self.__syncLdapDestination(update_objects)
356        if self.options.delete and not self.options.updateonly:
357            self.__deleteDestLdapObjects(update_objects)
358        self.con.unbind()
359
360        self.__log_summary(True)
361
362
363    def __log_summary(self, show_failed=True, show_ok=False):
364        result = self.result
365        for action in result.keys():
366            ok = len(result[action]['ok'])
367            failed = len(result[action]['failed'])
368            print "{} (ok: {}, failed: {}):".format(action, ok, failed)
369
370            if show_ok and ok > 0:
371                print "succeeded:"
372                print "\n".join(result[action]['ok'])
373
374            if show_failed and failed > 0:
375                print "failed:"
376                print "\n".join(result[action]['failed'])
377
378    def get_short_dn(self, dn):
379        return dn.lower().replace(',' + self.srcbasedn.lower(), '')
380
381    def notify_created(self, dn, ok=True):
382        if ok:
383            logger.debug('{} created'.format(self.get_short_dn(dn)))
384            self.result['add']['ok'].append(dn)
385        else:
386            self.logger.warning("failed to add {}".format(dn))
387            self.result['add']['failed'].append(dn)
388
389    def notify_modified(self, dn, ok=True):
390        if ok:
391            logger.debug('{} modified'.format(self.get_short_dn(dn)))
392            self.result['update']['ok'].append(dn)
393        else:
394            self.logger.error("failed to modify {}".format(dn))
395            self.result['update']['failed'].append(dn)
396
397    def notify_deleted(self, dn, ok=True):
398        if ok:
399            logger.debug('{} deleted'.format(self.get_short_dn(dn)))
400            self.result['delete']['ok'].append(dn)
401        else:
402            self.logger.error("failed to delete {}".format(dn))
403            self.result['delete']['failed'].append(dn)
404
405    def notify_renamed(self, dn, newdn, uid, newuid, options):
406        print "renamed", dn, newdn
407        subprocess.check_call(
408            "%s %s %s %s %s" % (options.renamecommand, dn, newdn, uid, newuid),
409            shell=True)
410
411
412
413class SyncReplConsumer(ReconnectLDAPObject, SyncreplConsumer):
414    """
415    Syncrepl Consumer interface
416    """
417
418    def __init__(self, dest, syncrepl_entry_callback, *args, **kwargs):
419        self.logger = logging.getLogger()
420        # Initialise the LDAP Connection first
421        ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs)
422        # We need this for later internal use
423        self.__presentUUIDs = dict()
424        self.cookie = None
425        self.dest_ldap = dest
426        self.syncrepl_entry_callback = syncrepl_entry_callback
427
428    def syncrepl_get_cookie(self):
429        return self.cookie
430
431    def syncrepl_set_cookie(self, cookie):
432        self.cookie = cookie
433
434    def syncrepl_entry(self, dn, attributes, uuid):
435        # First we determine the type of change we have here
436        # (and store away the previous data for later if needed)
437        if uuid in self.__presentUUIDs:
438            change_type = 'modify'
439        else:
440            change_type = 'add'
441        # Now we store our knowledge of the existence of this entry
442        self.__presentUUIDs[uuid] = dn
443        # Debugging
444        logger.debug('{}: {} ({})'.format(dn, change_type, ",".join(attributes.keys())))
445        # If we have a cookie then this is not our first time being run,
446        # so it must be a change
447        if self.cookie is not None:
448            self.syncrepl_entry_callback(dn, attributes)
449
450
451    def syncrepl_delete(self, uuids):
452        """ syncrepl_delete """
453        # Make sure we know about the UUID being deleted, just in case...
454        uuids = [uuid for uuid in uuids if uuid in self.__presentUUIDs]
455        # Delete all the UUID values we know of
456        for uuid in uuids:
457            logger.debug('detected deletion of entry {} ({})', uuid, self.__presentUUIDs[uuid])
458            del self.__presentUUIDs[uuid]
459
460    def syncrepl_present(self, uuids, refreshDeletes=False):
461        """ called on initial sync """
462        if uuids is not None:
463            self.logger.debug('uuids: {}'.format(','.join(uuids)))
464        # If we have not been given any UUID values,
465        # then we have recieved all the present controls...
466        if uuids is None:
467            # We only do things if refreshDeletes is false as the syncrepl
468            # extension will call syncrepl_delete instead when it detects a
469            # delete notice
470            if not refreshDeletes:
471                deletedEntries = [
472                    uuid for uuid in self.__presentUUIDs
473                ]
474                self.syncrepl_delete(deletedEntries)
475            # Phase is now completed, reset the list
476            self.__presentUUIDs = {}
477        else:
478            # Note down all the UUIDs we have been sent
479            for uuid in uuids:
480                self.__presentUUIDs[uuid] = True
481
482
483    def syncrepl_refreshdone(self):
484        self.logger.info('Initial synchronization is now done, persist phase begins')
485        #self.logger.debug('UUIDs:\n' + '\n'.join(self.__presentUUIDs))
486
487
488
489class LdapSyncRepl(LdapSync):
490    def __init__(self, destsrv,
491                 destadmindn, destadminpw,
492                 basedn, destbasedn,
493                 options=Options(), source_ldap_url_obj=None):
494        # Install our signal handlers
495        signal.signal(signal.SIGTERM, self.shutdown)
496        self.watcher_running = False
497        self.source_ldap_url_obj = source_ldap_url_obj
498        self.ldap_credentials = False
499        self.source_ldap_connection = None
500        super(LdapSyncRepl, self).__init__(destsrv,
501                                           destadmindn, destadminpw,
502                                           basedn, destbasedn, options)
503
504
505    def sync(self):
506        self._dest_ldap_connect()
507        self.watcher_running = True
508        while self.watcher_running:
509            self.logger.info('Connecting to source LDAP server')
510            # Prepare the LDAP server connection (triggers the connection as well)
511            self.source_ldap_connection = SyncReplConsumer(self.con,
512                                                           self.perform_application_sync_callback,
513                                                           self.source_ldap_url_obj.initializeUrl())
514
515            if self.source_ldap_url_obj.who and self.source_ldap_url_obj.cred:
516                self.ldap_credentials = True
517                # Now we login to the LDAP server
518                try:
519                    self.source_ldap_connection.simple_bind_s(
520                        self.source_ldap_url_obj.who, self.source_ldap_url_obj.cred)
521                except ldap.INVALID_CREDENTIALS, e:
522                    print 'Login to LDAP server failed: ', str(e)
523                    sys.exit(1)
524                except ldap.SERVER_DOWN:
525                    print 'LDAP server is down, going to retry.'
526                    time.sleep(5)
527                    continue
528
529            # Commence the syncing
530            self.logger.info('Staring sync process')
531            ldap_search = self.source_ldap_connection.syncrepl_search(
532                self.source_ldap_url_obj.dn or '',
533                self.source_ldap_url_obj.scope or ldap.SCOPE_SUBTREE,
534                mode='refreshAndPersist',
535                attrlist=self.source_ldap_url_obj.attrs,
536                filterstr=self.source_ldap_url_obj.filterstr or '(objectClass=*)'
537            )
538
539            try:
540                while self.source_ldap_connection.syncrepl_poll(all=1, msgid=ldap_search):
541                    print ".",
542            except KeyboardInterrupt:
543                # User asked to exit
544                print "aborted\n"
545                self.shutdown(None, None)
546            except Exception, e:
547                # Handle any exception
548                if self.watcher_running:
549                    self.logger.exception('Encountered a problem, going to retry.')
550                    time.sleep(5)
551
552    def perform_application_sync_callback(self, dn, attributes):
553        logger.debug('{}: src: {}'.format(dn, str(attributes)))
554        try:
555            self._syncLdapObject(dn, attributes)
556        except ldap.NO_SUCH_OBJECT:
557            self.logger.info("SKIPPED: {} object does not exist on target".format(dn))
558            return False
559        return True
560
561    def shutdown(self, signum, stack):
562        # Declare the needed global variables
563        self.logger.info('Shutting down!')
564
565        # We are no longer running
566        self.watcher_running = False
567
568
569if __name__ == "__main__":
570    logging.basicConfig(format='%(levelname)s %(module)s.%(funcName)s: %(message)s', level=logging.INFO)
571    logger = logging.getLogger()
572
573    args = getArguments()
574    if args.debug:
575        logger.setLevel(logging.DEBUG)
576    conffile = args.configfile
577
578    exclude = None
579
580    config = ConfigParserDefaults()
581    config.read(conffile)
582
583    srcfile = None
584    try:
585        srcfile = config.get("source", "file")
586    except:
587        pass
588
589    basedn = config.get("source", "baseDn")
590    filterstr = config.get0("source", "filter", None)
591
592    if srcfile is None:
593        srv = config.get("source", "server")
594        admindn = config.get("source", "bindDn")
595        adminpw = config.get("source", "bindPassword")
596        starttls = config.getboolean("source", "starttls")
597
598    destsrv = config.get("destination", "server")
599    destadmindn = config.get("destination", "bindDn")
600    destadminpw = config.get("destination", "bindPassword")
601    destbasedn = config.get("destination", "baseDn")
602    destdelete = config.getboolean("destination", "delete")
603    try:
604        rdn = config.get("destination", "rdn")
605        logger.warning("setting rdn is currently ignored")
606    except:
607        pass
608
609    options = Options()
610    try:
611        options.exclude = config.get("destination", "excludesubtree").lower()
612    except:
613        pass
614
615    options.updateonly = not config.getboolean("destination", "create", False)
616    options.starttls = config.getboolean("destination", "starttls", False)
617    options.renameattr = config.get0("destination", "detectRename", None)
618    options.renamecommand = config.get0("destination", "detectRename", None)
619    options.pwd_max_days = int(config.get("source", "pwd_max_days", 0))
620    options.filter = filterstr
621
622    # Set source.attrlist as global option.
623    # If source would use less attributes than dest,
624    # all attributes not retrieved from source would be deleted from dest
625    try:
626        options.attrlist = config.get("source", "attributes").split(",")
627    except:
628        options.attrlist = None
629
630    if config.get0('source', 'mode') == 'syncrepl':
631        ldapsync = LdapSyncRepl(
632            destsrv, destadmindn, destadminpw, basedn, destbasedn,
633            options,
634            source_ldap_url_obj=config.get_ldap_url_obj('source'))
635        ldapsync.sync()
636    else:
637        if srcfile:
638            objects = readLDIFSource(srcfile)
639        else:
640            objects = readLdapSource(srv, admindn, adminpw,
641                                     basedn, filterstr, options.attrlist, starttls)
642
643        ldapsync = LdapSync(destsrv, destadmindn, destadminpw, basedn, destbasedn, options)
644        ldapsync.sync(objects)
Note: See TracBrowser for help on using the repository browser.