|
马上注册,结交更多好友,享用更多功能^_^
您需要 登录 才可以下载或查看,没有账号?立即注册
x
找了半天的阿里云解析,论坛也有一篇,但是缺太多代码了,实在不知道怎么改,这个直接填入自己的AccessKeyID, AccessKeySecret,还有要解析的域名就可以直接用,什么都不用改,转载一下,也作为自己一个收藏
代码转自52论坛的 https://www.52pojie.cn/forum.php?mod=viewthread&tid=1296738
- import re, json, time, uuid, random, chardet
- import hmac, base64, hashlib
- import requests
- import urllib.parse, urllib.request
- # 轮询时间间隔
- TIME_INTERVAL = 5
- class PublicIP(object):
- def __init__(self):
- self.user_agent = ('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
- 'Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3776.400 QQBrowser/10.6.4212.400')
- self.api_list = [
- 'https://202020.ip138.com/',
- 'https://www.ip.cn/api/index?ip=&type=0',
- 'http://ip.webmasterhome.cn/',
- 'http://ip.yqie.com/clientip.aspx',
- 'https://www.123cha.com/ip/',
- 'http://ip.chacuo.net/',
- ]
- def ip_query(self):
- while True:
- url = random.sample(self.api_list, 1)[0]
- try:
- res = requests.get(url, headers={'User-Agent': self.user_agent}, timeout=5)
- encoding = chardet.detect(res.content)['encoding']
- html = res.content.decode(encoding)
- out = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', html)
- if out != []: return out[0]
- except Exception as e:
- continue
- class AscSigner(object):
- def __init__(self, AccessKeyId, AccessKeySecret):
- self.AccessKeyId = AccessKeyId
- self.AccessKeySecret = AccessKeySecret
- def _utc(self):
- return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
- def _uuid(self):
- return str(uuid.uuid4())
- def _pop_standard_urlencode(self, query):
- ret = query.replace('+', '%20')
- ret = ret.replace('*', '%2A')
- ret = ret.replace('%7E', '~')
- return ret
- def _common_param(self):
- return {
- 'Format': 'JSON',
- 'Version': '2015-01-09',
- 'SignatureMethod': 'HMAC-SHA1',
- 'SignatureNonce': self._uuid(),
- 'SignatureVersion': '1.0',
- 'AccessKeyId': self.AccessKeyId,
- 'Timestamp': self._utc(),
- }
- def _string_to_sign(self, param):
- sorted_parameters = sorted(list(param.items()), key=lambda param: param[0])
- sorted_query_string = self._pop_standard_urlencode(urllib.parse.urlencode(sorted_parameters))
- canonicalized_query_string = self._pop_standard_urlencode(urllib.request.pathname2url(sorted_query_string))
- string_to_sign = "GET&%2F&" + canonicalized_query_string
- return string_to_sign
- def _sign(self, string_to_sign):
- param_encode = bytes(string_to_sign, 'utf-8')
- key = bytes(self.AccessKeySecret + '&', 'utf-8')
- h = hmac.new(key, param_encode, hashlib.sha1)
- return str(base64.encodebytes(h.digest()).strip(), "utf-8")
- def get_param_and_sign(self, action_param):
- common_param = self._common_param()
- param = dict(common_param, **action_param)
- string_to_sign = self._string_to_sign(param)
- signature = self._sign(string_to_sign)
- param['Signature'] = signature
- url = '?' + self._pop_standard_urlencode(urllib.parse.urlencode(param))
- return url
- class AscRequestParam(object):
- def __init__(self, AccessKeyId, AccessKeySecret):
- self.signer = AscSigner(AccessKeyId, AccessKeySecret)
- # 获取域名列表
- def get_domains_params(self):
- action_param = {
- 'Action': 'DescribeDomains',
- 'PageSize': '100',
- }
- return self.signer.get_param_and_sign(action_param)
- # 获取解析记录
- def get_domain_records_params(self, domain_name):
- action_param = {
- 'Action': 'DescribeDomainRecords',
- 'DomainName': domain_name,
- 'PageSize': '100',
- }
- return self.signer.get_param_and_sign(action_param)
- # 修改解析记录
- def update_domain_record_params(self, data):
- action_param = {
- 'Action': 'UpdateDomainRecord',
- 'RR': data['RR'],
- 'RecordId': data['RecordId'],
- 'Type': data['Type'],
- 'Value': data['Value'],
- }
- return self.signer.get_param_and_sign(action_param)
- # 获取解析记录信息
- def get_domain_record_info_params(self, RecordId):
- action_param = {
- 'Action': 'DescribeDomainRecordInfo',
- 'RecordId': RecordId,
- }
- return self.signer.get_param_and_sign(action_param)
- class AscClient(object):
- def __init__(self, AccessKeyId, AccessKeySecret):
- self.ip = PublicIP()
- self.client = AscRequestParam(AccessKeyId, AccessKeySecret)
- self.api = 'https://alidns.aliyuncs.com/'
- def _request(self, params):
- res = requests.get(self.api + params)
- return json.loads(res.content.decode('utf-8'))
- def _wait(self):
- timecount = TIME_INTERVAL
- while timecount >= 0:
- print(f"\r再次检测倒计时:{timecount}秒...", end="", flush=True)
- timecount = timecount - 1
- time.sleep(1)
- print('\n\n', end="", flush=True)
- def update_domain_record(self, record):
- ip = self.ip.ip_query()
- print(f'公网IP:{ip},')
- if ip == record['Value']:
- print('公网IP和解析IP相同,无需更新')
- return
- record['Value'] = ip
- params = self.client.update_domain_record_params(record)
- self._request(params)
- print('解析完成')
- def get_domain_records(self, RR, domain_name):
- params = self.client.get_domain_records_params(domain_name)
- res = self._request(params)
- TotalCount = int(res['TotalCount'])
- for i in res['DomainRecords']['Record']:
- if i['RR'] == RR:
- return i
- return None
- def get_domains(self):
- params = self.client.get_domains_params()
- res = self._request(params)
- domain_list = []
- for i in res['Domains']['Domain']:
- domain_list.append(i['DomainName'])
- return int(res['TotalCount']), domain_list
- def start(self, RR, domain_name):
- while True:
- count, domains = self.get_domains()
- if count <= 0 or not domain_name in domains:
- print('没有此域名')
- return
- record = self.get_domain_records(RR, domain_name)
- if not record:
- print(f'没有找到{RR}的解析记录')
- return
- if not record['Type'] == 'A':
- print('暂时只支持解析A记录')
- return
- if record['Status'] == 'DISABLE':
- print('此记录已被禁用,请启用后再试')
- return
- self.update_domain_record(record)
- self._wait()
- if __name__ == '__main__':
- # 换成你自己的AccessKeyID, AccessKeySecret
- a = AscClient('**********', '**********************')
- # 如果需要解析 www.baidu.com,填入 www 和 baidu.com
- # 如果需要解析 baidu.com 填入 @ 和 baidu.com
- a.start('test', 'mydomain.net')
复制代码 |
|