iis服务器助手广告广告
返回顶部
首页 > 资讯 > 后端开发 > Python >python 操作 openldap
  • 370
分享到

python 操作 openldap

操作pythonopenldap 2023-01-31 07:01:55 370人浏览 八月长安

Python 官方文档:入门教程 => 点击学习

摘要

# -*- coding: utf-8 -*- # author : s import random,string from ldap3 import Server,Connection,ALL,SUBTREE,ALL_ATTRIBUTE

# -*- coding: utf-8 -*-
# author : s

import random,string
from ldap3 import Server,Connection,ALL,SUBTREE,ALL_ATTRIBUTES,MODIFY_REPLACE,MODIFY_ADD,MODIFY_DELETE
from passlib.hash import  ldap_salted_sha1 as ssha
from app.utils import  ErrMsg

class LdapOp(object):

    """
    Operation Dcouments: Http://ldap3.readthedocs.io/

    """

    def __init__(self,ip,port,user,passwd):
        self._ip = ip
        self._port = port
        self._user = user
        self._passwd = passwd
        self.dn = self._user.split(',',1)[1]
        self.s = Server(self._ip,self._port,get_info=ALL)
        self._conn =  Connection(self.s, self._user, self._passwd, auto_bind=True)

    @property
    def conn(self):

        if not self._conn:
            print('ldap conn init ')
            self._conn =  Connection(self.s, self._user, self._passwd, auto_bind=True)

        return self._conn

    def searchAll(self,name='top'):
        entries = self.conn.extend.standard.paged_search(self.dn, '(objectClass=%s)' % name ,attributes=['cn', 'givenName','gidNumber','uidNumber','telephoneNumber','mail'])
        en = list(entries)

        ulist = [v for v in en if  'People' in v.get('dn','')  and  'uid' in v.get('dn','') ]

        l = []
        for v in ulist:
            udict = {}
            udict['dn'] = v['dn']
            udict['cn'] =  v['attributes'].get('cn','')[0]  if len(v['attributes'].get('cn','')) > 0 else v['attributes'].get('cn','')
            udict['gid'] = str(v['attributes'].get('gidNumber',''))
            udict['mail'] = v['attributes'].get('mail','')[0]  if len(v['attributes'].get('mail','')) > 0 else v['attributes'].get('mail','')
            udict['phone'] = v['attributes'].get('telephoneNumber','')[0]  if len(v['attributes'].get('telephoneNumber','')) > 0 else v['attributes'].get('telephoneNumber','')
            l.append(udict)

        # append user is groups
        groups = self.searchGroups()
        ##  添加 组 到 用户的连接

        for user in l:
            user['groups'] = []
            for group in groups:
                # group.get('memberUid',[])
                if user.get('cn', '') in group.get('memberUid', []):
                    user['groups'].append(group.get('cn'))
        #print(l)
        return l

    def getGroups(self):
        l = []

        return l


    def user_addGroup(self, group_dn, user_cn=[]):
        data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_ADD, user_cn)]})
        return self.conn.entries

    def user_deleteGroup(self,group_dn,user_cn=[]):
        data = self.conn.modify(group_dn, {'memberuid': [(MODIFY_DELETE,user_cn)]})
        print(data)
        return self.conn.entries


    def addGroup(self,args):

        ouname = self.get_ouname('(|(ou=Groups)(ou=Group))')
        print('ouname=',ouname)
        groupname = args.get('name')
        groupid = args.get('groupid')
        #grouppassWord = args.get('grouppassword')

        cndn = 'cn=%s,%s' % (groupname,ouname)
        print(cndn)
        attr = {
            'objectClass': ['posixGroup', 'top'],
            'cn' : groupname,
            'gidNumber': groupid,
            #'userPassword': grouppassword
        }
        sgroup = [ g for g in self.searchGroups() if g.get('cn','') == groupname or g.get('gidNumber','') == groupid ]
        if sgroup:
            return {'msg': 'err', 'data': u' 组名字或组ID重复,请检查后重新添加!'}
        try:
            data = self.conn.add(cndn, attributes=attr)
            msg = 'ok'
        except Exception as e:

            data  = e
            msg = 'err'
        finally:
            #self.conn.unbind()
            return {'msg': msg , 'data': data}


    def searchGroups(self,groupname=''):

        if groupname:
            #print(self.conn)
            rv = self.conn.search('%s' % self.dn, '( &(objectClass=posixGroup)(cn=%s))' %groupname,
               attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
        else:
            rv = self.conn.search('%s' % self.dn, '(objectClass=posixGroup)',
               attributes=['sn', 'cn', 'objectclass', 'userPassword', 'gidNumber','memberUid'])
        data = []

        if rv:
            for en in  self.conn.entries:
                endict = en.entry_get_attributes_dict()
                suben = {
                    'cn': str(en['cn'][0]),
                    'gidNumber': str(en['gidNumber'][0]),
                    #'objectClass': ','.join(en['objectClass']),
                    #'userPassword': en['userPassword'][0],
                    'dn': str(en.entry_get_dn()),
                    'memberUid': endict.get('memberUid',[]),
                }
                data.append(suben)

        return data


    def deleteGroup(self,name=''):

        if not name:
            return {'msg':'err','data': 'Group Name not is None'}

        group = self.searchGroups(name)

        if len(group)== 1:

            try:

                self.conn.delete(group[0]['dn'])

                rv =  {'msg':'ok','data':''}
            except Exception as e:
                rv =  {'msg': 'err','data': e}

        elif len(group) == 0 :
            rv = {'msg': 'err', 'data': 'Not group Find'}

        else:
            rv = {'msg': 'err', 'data': 'Find %d groups '} %len(group)

        return rv

    def addUser(self,args):

        cndn = 'uid=%s,ou=People,%s' % ( args.get('name'),self.dn)
        #object_class = [ 'account','posixAccount', 'top', 'shadowAccount']

        if args.get('mail',''):
            mail = args.get('mail')
        else:
            if self.dn == 'dc=test,dc=com':
                dn = 'test.com'
                mail = '%s@%s' % (args.get('name'), dn)
            else:
                dn ='.'.join(map(lambda x: x[3:],self.dn.split(',')))
                mail = '%s@%s' % (args.get('name'),dn)
        attr = {
            'objectClass': ['account', 'posixAccount', 'top', 'shadowAccount'],
            'userPassword': args.get('passwd'),
            'uid': args.get('name'),
            'cn': args.get('name'),
            'shadowLastChange': '17038',
            'shadowMax': '99999',
            'shadowWarning': '7',
            'uidNumber': args.get('phonenumber'),
            'gidNumber': args.get('groupid'),
            'homeDirectory': '/home/%s' % args.get('name'),
            'mail': mail,
            'telephoneNumber': args.get('phonenumber'),
            'displayname': args.get('name'),
        }
        #print('*' *100)
        #print(cndn)
        #print(attr)
        try:
            data = self.conn.add(cndn, attributes=attr)
            if not data:
                
                attr['objectClass'] = ['posixAccount', 'shadowAccount', 'top', 'person', 'organizationalPerson','inetOrgPerson']
                attr['sn'] = args.get('name')
          
                data = self.conn.add(cndn, attributes=attr)
                if not data:
                    
                    msg = self.conn.result['message']
                    raise ValueError(msg)
            msg = 'ok'
        except Exception as e:

            data  = e
            msg = 'err'
        finally:
            self.conn.unbind()
        return {'msg': msg , 'data': data}

    def deleteUser(self,name=''):
        if not name:
            return {'msg':'err','data': 'Name not is None'}
        if 'dc' in name:
            dn = name
        else:
            dn = 'uid=%s,ou=People,%s' %(name,self.dn)
        print(dn)
        try:
            self.conn.delete(dn)
            rv =  {'msg':'ok','data':''}
        except Exception as e:
            rv =  {'msg': 'err','data': e}
        finally:
            self.conn.unbind()
        return  rv

    def searchUser(self,name):
        try:
            users = self.searchAll()
            if name:
                data = [ user for user in  users if name in  user.get('cn') ]
            else:
                data = users
            rv = 'ok'
        except Exception as e:
            data = e
            rv = 'err'
        finally:
            return {'msg':rv,'data':data}

    def searchSinUser(self,name):
        try:
            ga = self.conn.extend.standard.paged_search(search_base=self.dn,
                                           search_filter='(uid=%s)' %name,
                                           search_scope= SUBTREE,
                                           attributes=ALL_ATTRIBUTES)
            user = list(ga)
            if user:
                data = user[0].get('attributes')

                d = {
                    'user': str(data.get('cn')[0]),
                    'passwd': data.get('userPassword')[0].decode('utf-8'),
                    'gid': data.get('gidNumber'),
                    'mail': data.get('mail')[0],
                    'phonenumber': data.get('uidNumber')

                }
                
            else:
                d = {}
            users = self.searchAll()
            usingle = [d for d in users if d.get('cn') == name ]
            u = {}
            if usingle:
                u = usingle[0]
        except Exception as e:
            print(e)
            u = {'msg': ErrMsg()}

        finally:
            #print('user=',u)
            x = u.update(d)
            print('u=',u)
            return u

    def editUser(self,args):
        try:
            cn = 'uid=%s,ou=People,%s' % (args.get('user'), self.dn)
            print(cn)
            passwd = args.get('passwd','')

            user = self.searchSinUser(args.get('user'))
            data = ''
            if user.get('passwd') != passwd:
                if 'SSHA' not in passwd:
                    passwd = self.encodePasswd(passwd)
                   
                    data = self.conn.modify(cn, {'userPassword': (MODIFY_REPLACE,passwd)})
            if user.get('gid') != args.get('gid') and  args.get('gid') != 'default':
                data = self.conn.modify(cn,{'gidNumber': (MODIFY_REPLACE,args.get('gid')) })

        except Exception as e:
            ErrMsg()
            data = e
        finally:
            print('data', data)
            return data


    def setPasswd(self,**kwargs):
        try:
            cn = 'uid=%s,ou=People,%s' % (kwargs.get('user',''), self.dn)
            data = self.conn.modify(cn,{'userPassword':(MODIFY_REPLACE,kwargs.get('passwd', ''))})
        except Exception as e:
            data = ErrMsg()
        finally:
            return data

    @staticmethod
    def GenPasswd():
        return ''.join(random.sample(string.ascii_letters + string.digits, 8))

    def _searchUser_org(self,username):
        print('in-search-org')
        ga = self.conn.extend.standard.paged_search(search_base=self.dn,
                                       search_filter='(uid=%s)' %username,
                                       search_scope= SUBTREE,
                                       attributes=ALL_ATTRIBUTES)
        user = list(ga)
        print(user)
        entry = self.conn.response[0]
        dn = entry['dn']
        attr_dict = entry['attributes']

        return (dn,attr_dict)

    def authUser(self,username,password):
        
        try:
            cn = 'uid=%s,ou=People,%s' % (username,self.dn)
            conn2 = Connection(self._ip, user=cn, password=password,
                               check_names=True, lazy=False, raise_exceptions=False)
            conn2.bind()
            if conn2.result["description"] == "success":
                rv = 'ok'
                data = '认证成功'
            else:
                rv = 'err'
                data = '认证失败,用户名或密码错误!'
        except Exception as e:
            rv = 'err'
            data = ErrMsg()
        finally:
            return {'rv': rv,'data':data}

    @staticmethod
    def encodePasswd(password=''):
        """
        :param password:
        :return:
        """
        if password:
            return ssha.encrypt(password,salt_size=12)
        return ''

    def get_ouname(self,ouname):
        """
         ouname = '(|(ou=Groups)(ou=Group))'

        :param ouname:
        :return:
        """
        print(self.dn,ouname)
        ou = self.conn.search('%s' % self.dn, '%s' % ouname )
        print(ou)
        retry = self.conn.entries
        num = len(retry)
        if num == 0:
            return []

        else:
            return retry[0].entry_get_dn()


--结束END--

本文标题: python 操作 openldap

本文链接: https://www.lsjlt.com/news/191106.html(转载时请注明来源链接)

有问题或投稿请发送至: 邮箱/279061341@qq.com    QQ/279061341

本篇文章演示代码以及资料文档资料下载

下载Word文档到电脑,方便收藏和打印~

下载Word文档
猜你喜欢
  • python 操作 openldap
    # -*- coding: utf-8 -*- # author : s import random,string from ldap3 import Server,Connection,ALL,SUBTREE,ALL_ATTRIBUTE...
    99+
    2023-01-31
    操作 python openldap
  • Python操作SQLLite(基本操作
      SQLite 是一个软件库,实现了自给自足的、无服务器的、零配置的、事务性的 SQL 数据库引擎。SQLite 是在世界上最广泛部署的 SQL 数据库引擎。SQLite 源代码不受版权限制。 Python SQLITE数据库是一款非常...
    99+
    2023-01-31
    操作 Python SQLLite
  • python 操作 K8S
     pip install kubernetes    mkdir deamon/config    cp $HOME/.kube/config  deamon/config/kubeconfig.yamlfrom kubernetes im...
    99+
    2023-01-31
    操作 python K8S
  • python 操作excel
    python 读写 excel 有好多选择,但是,方便操作的库不多,在我尝试了几个库之后,我觉得两个比较方便的库分别是 xlrd/xlwt、openpyxl。 之所以推荐两个库是因为这两个库分别操作的是不同版本的 excel,xlrd 操作...
    99+
    2023-01-31
    操作 python excel
  • Python 操作json
    Json语法规则: 数据在名称/值对中 数据由逗号分隔 花括号保存对象 方括号保存数组 Json字符串本质上是一个字符串,用单引号表示 Json数据的书写格式 名称--值对,包括名称字段(在双引号中),后面跟一个冒号,然后是值: “na...
    99+
    2023-01-31
    操作 Python json
  • python 操作outlook
    # -*- coding: utf-8 -*- from win32com.client import constants from win32co...
    99+
    2023-01-31
    操作 python outlook
  • python操作solr
    solr接收http请求,所以使用requests库操作solr就可以 添加 data = {"add": {"doc": params, "commitWithin": 1000}} headers = {"Co...
    99+
    2023-01-31
    操作 python solr
  • Python操作符
    运算操作符+_*/% 取余// 除法取整** 幂运算例a = 3a += 2a → 5b = 4b -= 1b→310/8 → 1.2510//8 → 110 % 3 → 16 % 3 → 0逻辑操作符andornot...
    99+
    2023-01-31
    操作 Python
  • Python操作redis
    Python操作redispythonredis数据库searchimport首先确保redis已经正常启动。安装   可以去pypi上找到redis的Python模块:   http://pypi.python.org/pypi%3Aac...
    99+
    2023-01-31
    操作 Python redis
  • Python操作Mongodb
    一 导入 pymongo from pymongo import MongoClient 二 连接服务器 端口号 27017 连接MongoDB连接MongoDB我们需要使用PyMongo库里面的MongoClient,一般来说传入Mong...
    99+
    2023-01-31
    操作 Python Mongodb
  • python 操作kafka
    https://pypi.python.org/pypi/pykafka 最近项目中总是跟java配合,我一个写python的程序员,面对有复杂数据结构的java代码转换成python代码,确实是一大难题,有时候或多或少会留有一点...
    99+
    2023-01-31
    操作 python kafka
  • python操作mysql
    # rpm -qa |grep MySQL-python 查询是否有mysqldb库MySQL-python-1.2.3-0.3.c1.1.el6.x86_64>>> import MySQLdb #导入mysqldb模块...
    99+
    2023-01-31
    操作 python mysql
  • python操作Excel
    import xlrd import xlwt import xlutils import win32com#xlrd#打开exceldata = xlrd.open_workbook("I+P.xls")#查看文件中包含sheet的名称s...
    99+
    2023-01-31
    操作 python Excel
  • Python操作xml
    Xml XML指可扩展标记语言(Extensible Markup Language) XML被设计用于结构化、存储和传输数据 XML是一种标记语言,很类似于HTML XML没有像HTML那样具有预定义标签,需要程序员自定义标签。 ...
    99+
    2023-01-31
    操作 Python xml
  • python操作memcache
    Memcached是一个高性能的分布式内存对象缓存系统,用于动态Web应用以减轻数据库负载。它通过在内存中缓存数据和对象减少读取数据库的次数,从而提高动态、数据库驱动网站的速度。Memcached基于一个存储键/值的hash map。其守护...
    99+
    2023-01-31
    操作 python memcache
  • Python--URL操作
    目标URL:http://127.0.0.1:5000/oauth/authorizeredirect_uri=http%3A%2F%2F127.0.0.1%3A5000%2Fcallback%2F%3FskillId%3D189...
    99+
    2023-01-31
    操作 Python URL
  • Python操作Elasticsearc
    描述:ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。下面介绍了利用Python API接口进行数据查询,方便其他系统的调用。注:此文仅做笔记参考 ...
    99+
    2023-01-31
    操作 Python Elasticsearc
  • Python操作MSSQL
      Python连接SQL Server数据库 - pymssql使用基础:https://www.cnblogs.com/baiyangcao/p/pymssql_basic.html 廖雪峰官网 之 Python 访问数据库(SQLL...
    99+
    2023-01-31
    操作 Python MSSQL
  • python excel操作
    首先在python3.5里,我们可以使用新的python类库,来支持3.x之后的读写excel 针对 03版excel(xls结尾的),我们可以使用xlrd读,xlwt包来写 针对 07版excel(xlsx结尾的),我们可以使用openp...
    99+
    2023-01-31
    操作 python excel
  • python操作sqlserver
    # coding=gbkimport sys  import pymssql#尝试数据库连接try:    conn = pymssql.connect(host="192.168.1.43",user="sa",password="sa0...
    99+
    2023-01-31
    操作 python sqlserver
软考高级职称资格查询
编程网,编程工程师的家园,是目前国内优秀的开源技术社区之一,形成了由开源软件库、代码分享、资讯、协作翻译、讨论区和博客等几大频道内容,为IT开发者提供了一个发现、使用、并交流开源技术的平台。
  • 官方手机版

  • 微信公众号

  • 商务合作