source: dassldapsync/dassldapsync.py@ 1255

Last change on this file since 1255 was 1221, checked in by joergs, on Nov 16, 2016 at 4:18:48 PM

added syncrepl

  • Property svn:executable set to *
File size: 23.2 KB
Line 
1#!/usr/bin/python
2# vim: tabstop=4 expandtab shiftwidth=4 softtabstop=4
3
4# core modules
5import argparse
6import ConfigParser
7import logging
8from pprint import pprint
9import signal
10import subprocess
11import sys
12import time
13
14# external modules
15import datetime
16import dateutil.parser
17import dateutil.tz
18import ldap
19from ldap.ldapobject import ReconnectLDAPObject
20import ldap.modlist
21from ldap.syncrepl import SyncreplConsumer
22import ldapurl
23import ldif
24
25
26
27def getArguments():
28 configfile = '/etc/dassldapsync.conf'
29 parser = argparse.ArgumentParser(description='Synchronize the content of two LDAP servers.')
30 parser.add_argument('-d', '--debug', action='store_true', help="enable debug output")
31 parser.add_argument('configfile', default=configfile,
32 help="Configuration file [default: {}]".format(configfile))
33 return parser.parse_args()
34
35
36class Options(object):
37 def __init__(self):
38 self.delete = True
39 self.starttls = False
40 self.updateonly = False
41 self.filter = None
42 self.attrlist = None
43 self.exclude = None
44 self.renameattr = None
45 self.renamecommand = None
46 self.pwd_max_days = 0
47
48class ConfigParserDefaults(ConfigParser.ConfigParser, object):
49 def get(self, section, option, default=None):
50 try:
51 result = super(self.__class__, self).get(section, option)
52 except ConfigParser.NoOptionError:
53 if default is None:
54 raise
55 else:
56 result = default
57 return result
58
59 def get_section(self, section):
60 if section in self._sections:
61 return self._sections[section]
62
63 def get0(self, section, option, default=None):
64 try:
65 result = super(self.__class__, self).get(section, option)
66 except ConfigParser.NoOptionError:
67 result = default
68 return result
69
70 def getboolean(self, section, option, default=None):
71 try:
72 result = super(self.__class__, self).getboolean(section, option)
73 except ConfigParser.NoOptionError:
74 if default is None:
75 raise
76 else:
77 result = default
78 return result
79
80 def get_ldap_url_obj(self, section):
81 baseurl = 'ldap://{server}:389/{basedn}'.format(**(self.get_section(section)))
82 attrs = None
83 if self.get0(section, 'attributes') is not None:
84 attrs = self.get(section, 'attributes').split(',')
85 return ldapurl.LDAPUrl(
86 baseurl,
87 dn=self.get(section, 'baseDn', ''),
88 who=self.get0(section, 'bindDn'),
89 cred=self.get0(section, 'basePassword'),
90 filterstr=self.get0(section, 'filter'),
91 attrs=attrs
92 )
93
94
95def readLDIFSource(path):
96 logger = logging.getLogger()
97 logger.info("reading LDAP objects from file {}".format(path))
98 with open(path, 'r') as f:
99 parser = ldif.LDIFRecordList(f)
100 parser.parse()
101 result = parser.all_records
102 return result
103
104def readLdapSource(server, binddn, bindpw, basedn, filterstr, attrlist=None, starttls=False):
105 logger = logging.getLogger()
106 logger.info("reading LDAP objects from server {}".format(server))
107 con = ldap.open(server, port=389)
108 if starttls:
109 con.start_tls_s()
110 con.simple_bind_s(binddn, bindpw)
111 results = con.search_s(basedn, ldap.SCOPE_SUBTREE, filterstr, attrlist)
112 return results
113
114class LdapSync(object):
115 def __init__(self, destserver,
116 destbinddn, destbindpw,
117 srcbasedn, destbasedn, options=Options()):
118 self.logger = logging.getLogger()
119
120 self.destserver = destserver
121 self.destbasedn = destbasedn
122 self.destbinddn = destbinddn
123 self.destbindpw = destbindpw
124 self.options = options
125
126 self.srcbasedn = srcbasedn
127
128 self.con = None
129
130 self.attrmap = ldap.cidict.cidict({})
131 self.classmap = {}
132
133 self.junk_attrs = ["memberof", "modifiersname", "modifytimestamp", "entryuuid",
134 "entrycsn", "contextcsn", "creatorsname", "createtimestamp",
135 "structuralobjectclass", "pwdchangedtime", "pwdfailuretime"]
136
137 self.reset_result()
138
139
140 def reset_result(self):
141 self.result = {
142 'add': {'ok': [], 'failed': []},
143 'update': {'ok': [], 'failed': []},
144 'delete': {'ok': [], 'failed': []},
145 }
146
147
148 def _dest_ldap_connect(self):
149 if self.con is None:
150 self.logger.info("connect to destination LDAP server {}".format(self.destserver))
151 self.con = ldap.open(self.destserver, port=389)
152 if self.options.starttls:
153 self.con.start_tls_s()
154 self.con.simple_bind_s(self.destbinddn, self.destbindpw)
155
156 def __adapt_dn(self, dn):
157 # move LDAP object to dest base
158 if self.srcbasedn != self.destbasedn:
159 dn_old = dn
160 rpath = dn[:-len(self.srcbasedn)]
161 dn = rpath+self.destbasedn
162 self.logger.debug("moved {} to {}".format(dn_old, dn))
163 # print "dn:",dn,"src:",srcbasedn,"rpath:",rpath,"dest:",destbasedn
164 return dn
165
166 def __is_dn_included(self, dn):
167 if self.options.exclude is None:
168 return True
169 if dn.lower().endswith(self.options.exclude):
170 return False
171 return True
172
173 def __adapt_source_ldap_objects(self, searchresult):
174 """
175 Do configured modification to the source LDAP objects.
176 """
177 self.logger.debug("modifying LDAP objects retrieved from source LDAP")
178
179 update_objects = []
180
181 for r in searchresult:
182 dn = self.__adapt_dn(r[0])
183 d = ldap.cidict.cidict(r[1])
184
185 if self.__is_dn_included(dn):
186 objectclasses = d["objectclass"]
187
188 newObjectclasses = []
189 for o in objectclasses:
190 if o.lower() in self.classmap:
191 new_oc = self.classmap[o.lower()]
192 if new_oc not in newObjectclasses:
193 newObjectclasses.append(new_oc)
194 else:
195 if o not in newObjectclasses:
196 newObjectclasses.append(o)
197
198 d["objectclass"] = newObjectclasses
199
200 for a in d.keys():
201 attr = a
202 if self.attrmap.has_key(a.lower()):
203 attr = self.attrmap[attr].lower()
204 if attr.lower() != a.lower():
205 values = d[a]
206 del d[a]
207 d[attr] = values
208
209 update_objects.append((dn, d))
210 return update_objects
211
212
213 def _get_dest_entry(self, dn, entry):
214 """
215 In the destination LDAP, the objects should be named
216 according to options.renameattr.
217 """
218 attrlist = self.options.attrlist
219
220 existingDestDn = None
221 existingDestEntry = None
222 if self.options.renameattr and entry.has_key(self.options.renameattr):
223 searchresult = self.con.search_s(
224 self.destbasedn,
225 ldap.SCOPE_SUBTREE,
226 '%s=%s' % (self.options.renameattr, entry[self.options.renameattr][0]), attrlist)
227 if searchresult is not None and len(searchresult) > 0:
228 existingDestDn, existingDestEntry = searchresult[0]
229 if existingDestDn.lower() != dn.lower():
230 self.con.modrdn_s(existingDestDn, dn)
231 self.notify_renamed(existingDestDn, dn,
232 existingDestEntry[self.options.renameattr][0],
233 entry[self.options.renameattr][0],
234 options)
235 if existingDestDn is None:
236 searchresult = self.con.search_s(dn, ldap.SCOPE_BASE, 'objectclass=*', attrlist)
237 existingDestDn, existingDestEntry = searchresult[0]
238 return (existingDestDn, existingDestEntry)
239
240
241 def __handle_pwdAccountLockedTime(self, dn, entry, now, max_age):
242 # hack for syncing accounts locked by password policy
243 do_unlock = False
244 if self.options.pwd_max_days > 0 and entry.has_key('pwdChangedTime'):
245 # print "pwdChangedTime set for",dn
246 pwdChange = entry['pwdChangedTime'][0]
247 d = dateutil.parser.parse(pwdChange)
248 if (now-d) > max_age:
249 entry['pwdAccountLockedTime'] = ['000001010000Z']
250 self.logger.info("locking {} {}".format(dn, pwdChange))
251 else:
252 # pwdAccountLockedTime is a operational attribute,
253 # and therefore not part of entry.
254 # Do extra search to retrieve attribute.
255 searchresult = self.con.search_s(
256 dn, ldap.SCOPE_BASE,
257 "objectclass=*", attrlist=['pwdAccountLockedTime'])
258 tmp_dn, tmp_entry = searchresult[0]
259 if tmp_entry.has_key('pwdAccountLockedTime'):
260 do_unlock = True
261 return do_unlock
262
263
264 def _syncLdapObject(self, srcDn, srcAttributes):
265 tzutc = dateutil.tz.gettz('UTC')
266 now = datetime.datetime.now(tzutc)
267 max_age = datetime.timedelta(days=self.options.pwd_max_days)
268
269 try:
270 destDn, destAttributes = self._get_dest_entry(srcDn, srcAttributes)
271
272 # hack for syncing accounts locked by password policy
273 do_unlock = self.__handle_pwdAccountLockedTime(srcDn, srcAttributes, now, max_age)
274
275 mod_attrs = ldap.modlist.modifyModlist(destAttributes, srcAttributes)
276
277 # hack for unlocking, see above
278 if do_unlock:
279 self.logger.info("unlocking {} {}".format(destDn, 'pwdAccountLockedTime'))
280 mod_attrs.append((ldap.MOD_DELETE, 'pwdAccountLockedTime', None))
281
282 if self.options.attrlist is not None:
283 mod_attrs = [a for a in mod_attrs if a[1].lower() in self.options.attrlist]
284
285 if self.junk_attrs is not None:
286 mod_attrs = [a for a in mod_attrs if a[1].lower() not in self.junk_attrs]
287
288 if mod_attrs:
289 try:
290 self.logger.debug('mod_attrs: ' + str(mod_attrs))
291 self.con.modify_s(srcDn, mod_attrs)
292 self.notify_modified(srcDn)
293 except:
294 self.logger.exception('modify failed')
295 self.notify_modified(srcDn, False)
296
297 except ldap.NO_SUCH_OBJECT:
298 if not self.options.updateonly:
299 try:
300 self.con.add_s(srcDn, ldap.modlist.addModlist(srcAttributes, self.junk_attrs))
301 self.notify_created(srcDn)
302 except (ldap.OBJECT_CLASS_VIOLATION,
303 ldap.NO_SUCH_OBJECT,
304 ldap.CONSTRAINT_VIOLATION):
305 self.notify_created(srcDn, False)
306
307
308 def __syncLdapDestination(self, update_objects):
309
310 logger.debug("writing data to destination LDAP")
311 for obj in update_objects:
312 dn, entry = obj
313 self._syncLdapObject(dn, entry)
314
315
316 def __deleteDestLdapObjects(self, update_objects):
317 """
318 Remove all LDAP objects in destination LDAP server
319 that did not come from the source LDAP objects
320 and are not excluded.
321 """
322
323 searchresult = self.con.search_s(self.destbasedn, ldap.SCOPE_SUBTREE, self.options.filter)
324 existing = [x[0].lower() for x in searchresult]
325
326 morituri = existing
327
328 if self.destbasedn.lower() in existing:
329 morituri.remove(self.destbasedn.lower())
330
331 for obj in update_objects:
332 dn, entry = obj
333 if dn.lower() in existing:
334 morituri.remove(dn.lower())
335 for dn in morituri:
336 if self.__is_dn_included(dn):
337 try:
338 self.con.delete_s(dn)
339 self.notify_deleted(dn)
340 except:
341 self.notify_deleted(dn, False)
342
343
344 def sync(self, searchresult):
345 """
346 Synchronize entries from searchresult to destination LDAP server.
347 """
348 if len(searchresult) == 0:
349 self.logger.error("empty source, aborting")
350 return
351
352 self._dest_ldap_connect()
353
354 update_objects = self.__adapt_source_ldap_objects(searchresult)
355 self.__syncLdapDestination(update_objects)
356 if self.options.delete and not self.options.updateonly:
357 self.__deleteDestLdapObjects(update_objects)
358 self.con.unbind()
359
360 self.__log_summary(True)
361
362
363 def __log_summary(self, show_failed=True, show_ok=False):
364 result = self.result
365 for action in result.keys():
366 ok = len(result[action]['ok'])
367 failed = len(result[action]['failed'])
368 print "{} (ok: {}, failed: {}):".format(action, ok, failed)
369
370 if show_ok and ok > 0:
371 print "succeeded:"
372 print "\n".join(result[action]['ok'])
373
374 if show_failed and failed > 0:
375 print "failed:"
376 print "\n".join(result[action]['failed'])
377
378 def get_short_dn(self, dn):
379 return dn.lower().replace(',' + self.srcbasedn.lower(), '')
380
381 def notify_created(self, dn, ok=True):
382 if ok:
383 logger.debug('{} created'.format(self.get_short_dn(dn)))
384 self.result['add']['ok'].append(dn)
385 else:
386 self.logger.warning("failed to add {}".format(dn))
387 self.result['add']['failed'].append(dn)
388
389 def notify_modified(self, dn, ok=True):
390 if ok:
391 logger.debug('{} modified'.format(self.get_short_dn(dn)))
392 self.result['update']['ok'].append(dn)
393 else:
394 self.logger.error("failed to modify {}".format(dn))
395 self.result['update']['failed'].append(dn)
396
397 def notify_deleted(self, dn, ok=True):
398 if ok:
399 logger.debug('{} deleted'.format(self.get_short_dn(dn)))
400 self.result['delete']['ok'].append(dn)
401 else:
402 self.logger.error("failed to delete {}".format(dn))
403 self.result['delete']['failed'].append(dn)
404
405 def notify_renamed(self, dn, newdn, uid, newuid, options):
406 print "renamed", dn, newdn
407 subprocess.check_call(
408 "%s %s %s %s %s" % (options.renamecommand, dn, newdn, uid, newuid),
409 shell=True)
410
411
412
413class SyncReplConsumer(ReconnectLDAPObject, SyncreplConsumer):
414 """
415 Syncrepl Consumer interface
416 """
417
418 def __init__(self, dest, syncrepl_entry_callback, *args, **kwargs):
419 self.logger = logging.getLogger()
420 # Initialise the LDAP Connection first
421 ldap.ldapobject.ReconnectLDAPObject.__init__(self, *args, **kwargs)
422 # We need this for later internal use
423 self.__presentUUIDs = dict()
424 self.cookie = None
425 self.dest_ldap = dest
426 self.syncrepl_entry_callback = syncrepl_entry_callback
427
428 def syncrepl_get_cookie(self):
429 return self.cookie
430
431 def syncrepl_set_cookie(self, cookie):
432 self.cookie = cookie
433
434 def syncrepl_entry(self, dn, attributes, uuid):
435 # First we determine the type of change we have here
436 # (and store away the previous data for later if needed)
437 if uuid in self.__presentUUIDs:
438 change_type = 'modify'
439 else:
440 change_type = 'add'
441 # Now we store our knowledge of the existence of this entry
442 self.__presentUUIDs[uuid] = dn
443 # Debugging
444 logger.debug('{}: {} ({})'.format(dn, change_type, ",".join(attributes.keys())))
445 # If we have a cookie then this is not our first time being run,
446 # so it must be a change
447 if self.cookie is not None:
448 self.syncrepl_entry_callback(dn, attributes)
449
450
451 def syncrepl_delete(self, uuids):
452 """ syncrepl_delete """
453 # Make sure we know about the UUID being deleted, just in case...
454 uuids = [uuid for uuid in uuids if uuid in self.__presentUUIDs]
455 # Delete all the UUID values we know of
456 for uuid in uuids:
457 logger.debug('detected deletion of entry {} ({})', uuid, self.__presentUUIDs[uuid])
458 del self.__presentUUIDs[uuid]
459
460 def syncrepl_present(self, uuids, refreshDeletes=False):
461 """ called on initial sync """
462 if uuids is not None:
463 self.logger.debug('uuids: {}'.format(','.join(uuids)))
464 # If we have not been given any UUID values,
465 # then we have recieved all the present controls...
466 if uuids is None:
467 # We only do things if refreshDeletes is false as the syncrepl
468 # extension will call syncrepl_delete instead when it detects a
469 # delete notice
470 if not refreshDeletes:
471 deletedEntries = [
472 uuid for uuid in self.__presentUUIDs
473 ]
474 self.syncrepl_delete(deletedEntries)
475 # Phase is now completed, reset the list
476 self.__presentUUIDs = {}
477 else:
478 # Note down all the UUIDs we have been sent
479 for uuid in uuids:
480 self.__presentUUIDs[uuid] = True
481
482
483 def syncrepl_refreshdone(self):
484 self.logger.info('Initial synchronization is now done, persist phase begins')
485 #self.logger.debug('UUIDs:\n' + '\n'.join(self.__presentUUIDs))
486
487
488
489class LdapSyncRepl(LdapSync):
490 def __init__(self, destsrv,
491 destadmindn, destadminpw,
492 basedn, destbasedn,
493 options=Options(), source_ldap_url_obj=None):
494 # Install our signal handlers
495 signal.signal(signal.SIGTERM, self.shutdown)
496 self.watcher_running = False
497 self.source_ldap_url_obj = source_ldap_url_obj
498 self.ldap_credentials = False
499 self.source_ldap_connection = None
500 super(LdapSyncRepl, self).__init__(destsrv,
501 destadmindn, destadminpw,
502 basedn, destbasedn, options)
503
504
505 def sync(self):
506 self._dest_ldap_connect()
507 self.watcher_running = True
508 while self.watcher_running:
509 self.logger.info('Connecting to source LDAP server')
510 # Prepare the LDAP server connection (triggers the connection as well)
511 self.source_ldap_connection = SyncReplConsumer(self.con,
512 self.perform_application_sync_callback,
513 self.source_ldap_url_obj.initializeUrl())
514
515 if self.source_ldap_url_obj.who and self.source_ldap_url_obj.cred:
516 self.ldap_credentials = True
517 # Now we login to the LDAP server
518 try:
519 self.source_ldap_connection.simple_bind_s(
520 self.source_ldap_url_obj.who, self.source_ldap_url_obj.cred)
521 except ldap.INVALID_CREDENTIALS, e:
522 print 'Login to LDAP server failed: ', str(e)
523 sys.exit(1)
524 except ldap.SERVER_DOWN:
525 print 'LDAP server is down, going to retry.'
526 time.sleep(5)
527 continue
528
529 # Commence the syncing
530 self.logger.info('Staring sync process')
531 ldap_search = self.source_ldap_connection.syncrepl_search(
532 self.source_ldap_url_obj.dn or '',
533 self.source_ldap_url_obj.scope or ldap.SCOPE_SUBTREE,
534 mode='refreshAndPersist',
535 attrlist=self.source_ldap_url_obj.attrs,
536 filterstr=self.source_ldap_url_obj.filterstr or '(objectClass=*)'
537 )
538
539 try:
540 while self.source_ldap_connection.syncrepl_poll(all=1, msgid=ldap_search):
541 print ".",
542 except KeyboardInterrupt:
543 # User asked to exit
544 print "aborted\n"
545 self.shutdown(None, None)
546 except Exception, e:
547 # Handle any exception
548 if self.watcher_running:
549 self.logger.exception('Encountered a problem, going to retry.')
550 time.sleep(5)
551
552 def perform_application_sync_callback(self, dn, attributes):
553 logger.debug('{}: src: {}'.format(dn, str(attributes)))
554 try:
555 self._syncLdapObject(dn, attributes)
556 except ldap.NO_SUCH_OBJECT:
557 self.logger.info("SKIPPED: {} object does not exist on target".format(dn))
558 return False
559 return True
560
561 def shutdown(self, signum, stack):
562 # Declare the needed global variables
563 self.logger.info('Shutting down!')
564
565 # We are no longer running
566 self.watcher_running = False
567
568
569if __name__ == "__main__":
570 logging.basicConfig(format='%(levelname)s %(module)s.%(funcName)s: %(message)s', level=logging.INFO)
571 logger = logging.getLogger()
572
573 args = getArguments()
574 if args.debug:
575 logger.setLevel(logging.DEBUG)
576 conffile = args.configfile
577
578 exclude = None
579
580 config = ConfigParserDefaults()
581 config.read(conffile)
582
583 srcfile = None
584 try:
585 srcfile = config.get("source", "file")
586 except:
587 pass
588
589 basedn = config.get("source", "baseDn")
590 filterstr = config.get0("source", "filter", None)
591
592 if srcfile is None:
593 srv = config.get("source", "server")
594 admindn = config.get("source", "bindDn")
595 adminpw = config.get("source", "bindPassword")
596 starttls = config.getboolean("source", "starttls")
597
598 destsrv = config.get("destination", "server")
599 destadmindn = config.get("destination", "bindDn")
600 destadminpw = config.get("destination", "bindPassword")
601 destbasedn = config.get("destination", "baseDn")
602 destdelete = config.getboolean("destination", "delete")
603 try:
604 rdn = config.get("destination", "rdn")
605 logger.warning("setting rdn is currently ignored")
606 except:
607 pass
608
609 options = Options()
610 try:
611 options.exclude = config.get("destination", "excludesubtree").lower()
612 except:
613 pass
614
615 options.updateonly = not config.getboolean("destination", "create", False)
616 options.starttls = config.getboolean("destination", "starttls", False)
617 options.renameattr = config.get0("destination", "detectRename", None)
618 options.renamecommand = config.get0("destination", "detectRename", None)
619 options.pwd_max_days = int(config.get("source", "pwd_max_days", 0))
620 options.filter = filterstr
621
622 # Set source.attrlist as global option.
623 # If source would use less attributes than dest,
624 # all attributes not retrieved from source would be deleted from dest
625 try:
626 options.attrlist = config.get("source", "attributes").split(",")
627 except:
628 options.attrlist = None
629
630 if config.get0('source', 'mode') == 'syncrepl':
631 ldapsync = LdapSyncRepl(
632 destsrv, destadmindn, destadminpw, basedn, destbasedn,
633 options,
634 source_ldap_url_obj=config.get_ldap_url_obj('source'))
635 ldapsync.sync()
636 else:
637 if srcfile:
638 objects = readLDIFSource(srcfile)
639 else:
640 objects = readLdapSource(srv, admindn, adminpw,
641 basedn, filterstr, options.attrlist, starttls)
642
643 ldapsync = LdapSync(destsrv, destadmindn, destadminpw, basedn, destbasedn, options)
644 ldapsync.sync(objects)
Note: See TracBrowser for help on using the repository browser.