目錄
- 1、注冊端
- 2、回調(diào)端:以下示例代碼為python2,django 框架
釘釘API文檔:https://ding-doc.dingtalk.com/doc#/serverapi2/skn8ld
釘釘有回調(diào)事件流程,有哪些回調(diào)?比如:通訊錄回調(diào)、審批回調(diào)等等,拿通訊錄回調(diào)來說,就是當(dāng)你公司組織架構(gòu)發(fā)生變動時(shí),會自動觸發(fā)你自己注冊的回調(diào)地址,然后根據(jù)回調(diào)信息做一些自定義的處理,不得不說,釘釘真的是解決了協(xié)同辦公的很多問題,非常nice,但就回調(diào)事件來說,每個(gè)企業(yè)只能注冊一個(gè)回調(diào)地址,即使你要監(jiān)聽的是不同的事件、即使還有其他業(yè)務(wù)線需要用到回調(diào),也只能不多于一個(gè)回調(diào),當(dāng)然這都是出于人家服務(wù)多方面考慮的設(shè)計(jì),廢話不多說,
好,流程:
一個(gè)注冊端向釘釘發(fā)送注冊請求,釘釘callback你自己的服務(wù)url,必須是公網(wǎng)訪問的地址,并且該地址返回釘釘json數(shù)據(jù)來告訴釘釘回調(diào)成功
注冊成功之后這個(gè)地址就可以接收釘釘?shù)幕卣{(diào)post請求,做一些自定義處理,記得返回json
好,代碼
1、注冊端
mport requests, json
class DingDingCallBack(object):
def __init__(self):
self.appsecret=''
self.appkey=''
self.api_url = "https://oapi.dingtalk.com/gettoken?appkey=%sappsecret=%s"%(self.appkey,self.appsecret)
self.aes_key = ""
self.callbackUrl = "回調(diào)url"
self.headers = {
'Content-Type': 'application/json',
'User-Agent': 'Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.96 Safari/537.36',
'username':'zhangsan',
'password':'123456'
}
def get_token(self):
res = requests.get(self.api_url)
if res.status_code == 200:
str_res = res.text
token = (json.loads(str_res)).get('access_token')
return token
def regist_call_back(self):
url = 'https://oapi.dingtalk.com/call_back/register_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
data = {
"call_back_tag": ["bpms_task_change", "bpms_instance_change"], # 審批回調(diào)
"token": "qxN3cm",
"aes_key": self.aes_key,
"url":self.callbackUrl,
}
data1 = json.dumps(data)
response = requests.post(url, headers=self.headers, data=data1)
print(response)
print(response.text)
def query_callback(self):
url = 'https://oapi.dingtalk.com/call_back/get_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
response = requests.get(url, headers=self.headers, )
print(response)
print(response.text)
def update_call_back(self):
url = 'https://oapi.dingtalk.com/call_back/update_call_back?access_token={}'.format(self.get_token())
data = {
"call_back_tag": ["bpms_task_change", "bpms_instance_change","user_add_org","user_leave_org","org_dept_create","org_dept_modify","org_dept_remove"],
"token": "自定義字符串",
"aes_key": self.aes_key,
"url":self.callbackUrl
}
data1 = json.dumps(data)
response = requests.post(url, headers=self.headers, data=data1)
print(response)
print(response.text)
def get_fail(self):
url = 'https://oapi.dingtalk.com/call_back/get_call_back_failed_result?access_token={}'.format(self.get_token())
response = requests.get(url, headers=self.headers, )
print(response.text)# todo 刪除回調(diào)
def delete_callback(self):
url = 'https://oapi.dingtalk.com/call_back/delete_call_back?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
result = requests.get(url)
print(result)
print(result.text)
#
if __name__ == '__main__':
dingOBJ = DingDingCallBack()
# todo 獲取釘釘token
# # dingOBJ.get_token()
# todo 獲取回調(diào)失敗
# dingOBJ.get_fail()
# todo 注冊回調(diào)
# dingOBJ.regist_call_back()
# todo 查詢回調(diào)事件
dingOBJ.query_callback()
# todo 修改回調(diào)事件
# dingOBJ.update_call_back()
# todo 刪除回調(diào)
# dingOBJ.delete_callback()
2、回調(diào)端:以下示例代碼為python2,django 框架
# -*- coding:utf-8 -*-
from django.shortcuts import render
from django.http import JsonResponse,HttpResponse
from django.views.generic import View
from django.views.decorators.csrf import csrf_exempt
from DingCrypto import DingCrypto
import random,string,time,json,requests,simplejson
# Create your views here.from ast import literal_eval
encodingAesKey = ''
key = ''
token = '自定義字符串'
appsecret = ''
appkey = ''
api_url = "https://oapi.dingtalk.com/gettoken?appkey=%sappsecret=%s"%(appkey,appsecret)
def randam_string(n):
ran_str = ''.join(random.sample(string.ascii_letters + string.digits, n))
return ran_str
# 注冊回調(diào)
@csrf_exempt
def workOrderCallback(request):
if request.method == 'GET':
return JsonResponse({'code':200,'msg':'ok'})
if request.method == 'POST':
print request.GET
dingCrypto = DingCrypto(encodingAesKey,key)
nonce = randam_string(8)
timestamp = str(int(round(time.time())))
encrpyt = dingCrypto.encrypt('success')
# print nonce,timestamp,token,encrpyt
signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
new_data = {
'data': {
'msg_signature': signature,
'timeStamp': timestamp,
'nonce': nonce,
'encrypt': encrpyt
},
}
encrpyt11 = dingCrypto.decrypt(encrpyt)
return JsonResponse(new_data.get('data'))
# 響應(yīng)回調(diào)
class CallBack(View):
def get(self,request,*args,**kwargs):
return JsonResponse({'code':200,'msg':'ok'})
def get_token(self):
res = requests.get(api_url)
if res.status_code == 200:
str_res = res.text
token = (json.loads(str_res)).get('access_token')
return token
def post(self,request,*args,**kwargs):
# data = request.GET
dingCrypto = DingCrypto(encodingAesKey, key)
nonce = randam_string(8)
timestamp = str(int(round(time.time())))
encrpyt = dingCrypto.encrypt('success')
# signature = dingCrypto.generateSignature(nonce=nonce,timestamp=timestamp,token=token,msg_encrypt=encrpyt)
callback = json.loads(dingCrypto.decrypt(json.loads(request.body).get('encrypt')))
if callback['EventType'] == 'bpms_instance_change': # 審批實(shí)例開始,結(jié)束
url = 'https://oapi.dingtalk.com/topapi/processinstance/get?access_token={ACCESS_TOKEN}'.format(ACCESS_TOKEN=self.get_token())
instace_ = {
"process_instance_id": callback['processInstanceId']
}
data2 = json.dumps(instace_)
req = requests.post(url,data=data2)
data = literal_eval(str(req.text)).get('process_instance')
excute_workorder(callback['processInstanceId'],data)
elif callback['EventType'] == 'bpms_task_change': # 審批任務(wù)開始,結(jié)束,轉(zhuǎn)交
print 'bpms_task_change'
elif callback['EventType'] == 'user_add_org':
print '用戶增加'elif callback['EventType'] == 'user_leave_org':
print '用戶離職'
elif callback['EventType'] == 'org_dept_create':
print '組織架構(gòu)添加'elif callback['EventType'] == 'org_dept_modify':
print '組織架構(gòu)變更'elif callback['EventType'] == 'org_dept_remove':
print '組織架構(gòu)刪除'return HttpResponse(encrpyt)
到此這篇關(guān)于python注冊釘釘回調(diào)事件的實(shí)現(xiàn)的文章就介紹到這了,更多相關(guān)python注冊釘釘回調(diào)事件內(nèi)容請搜索腳本之家以前的文章或繼續(xù)瀏覽下面的相關(guān)文章希望大家以后多多支持腳本之家!
您可能感興趣的文章:- Python實(shí)現(xiàn)釘釘發(fā)送報(bào)警消息的方法
- python3實(shí)現(xiàn)釘釘消息推送的方法示例
- python 調(diào)用釘釘機(jī)器人的方法
- Python調(diào)用釘釘自定義機(jī)器人的實(shí)現(xiàn)
- python釘釘機(jī)器人運(yùn)維腳本監(jiān)控實(shí)例
- Python制作釘釘加密/解密工具
- Python實(shí)現(xiàn)釘釘訂閱消息功能
- Python第三方包之DingDingBot釘釘機(jī)器人
- 如何基于python對接釘釘并獲取access_token
- 詳解使用python3.7配置開發(fā)釘釘群自定義機(jī)器人(2020年新版攻略)
- 淺談Python 釘釘報(bào)警必備知識系統(tǒng)講解