tryhi 发表于 2021-1-18 16:11:43

【转载】阿里云DDNS动态域名解析,无需SDK

找了半天的阿里云解析,论坛也有一篇,但是缺太多代码了,实在不知道怎么改,这个直接填入自己的AccessKeyID, AccessKeySecret,还有要解析的域名就可以直接用,什么都不用改,转载一下,也作为自己一个收藏
代码转自52论坛的 https://www.52pojie.cn/forum.php?mod=viewthread&tid=1296738


import re, json, time, uuid, random, chardet
import hmac, base64, hashlib
import requests
import urllib.parse, urllib.request

# 轮询时间间隔
TIME_INTERVAL = 5


class PublicIP(object):
    def __init__(self):
      self.user_agent = ('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
                           'Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3776.400 QQBrowser/10.6.4212.400')
      self.api_list = [
            'https://202020.ip138.com/',
            'https://www.ip.cn/api/index?ip=&type=0',
            'http://ip.webmasterhome.cn/',
            'http://ip.yqie.com/clientip.aspx',
            'https://www.123cha.com/ip/',
            'http://ip.chacuo.net/',
      ]

    def ip_query(self):
      while True:
            url = random.sample(self.api_list, 1)
            try:
                res = requests.get(url, headers={'User-Agent': self.user_agent}, timeout=5)
                encoding = chardet.detect(res.content)['encoding']
                html = res.content.decode(encoding)
                out = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', html)
                if out != []: return out
            except Exception as e:
                continue


class AscSigner(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
      self.AccessKeyId = AccessKeyId
      self.AccessKeySecret = AccessKeySecret

    def _utc(self):
      return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

    def _uuid(self):
      return str(uuid.uuid4())

    def _pop_standard_urlencode(self, query):
      ret = query.replace('+', '%20')
      ret = ret.replace('*', '%2A')
      ret = ret.replace('%7E', '~')
      return ret

    def _common_param(self):
      return {
            'Format': 'JSON',
            'Version': '2015-01-09',
            'SignatureMethod': 'HMAC-SHA1',
            'SignatureNonce': self._uuid(),
            'SignatureVersion': '1.0',
            'AccessKeyId': self.AccessKeyId,
            'Timestamp': self._utc(),
      }

    def _string_to_sign(self, param):
      sorted_parameters = sorted(list(param.items()), key=lambda param: param)
      sorted_query_string = self._pop_standard_urlencode(urllib.parse.urlencode(sorted_parameters))
      canonicalized_query_string = self._pop_standard_urlencode(urllib.request.pathname2url(sorted_query_string))
      string_to_sign = "GET&%2F&" + canonicalized_query_string
      return string_to_sign

    def _sign(self, string_to_sign):
      param_encode = bytes(string_to_sign, 'utf-8')
      key = bytes(self.AccessKeySecret + '&', 'utf-8')
      h = hmac.new(key, param_encode, hashlib.sha1)
      return str(base64.encodebytes(h.digest()).strip(), "utf-8")

    def get_param_and_sign(self, action_param):
      common_param = self._common_param()
      param = dict(common_param, **action_param)
      string_to_sign = self._string_to_sign(param)
      signature = self._sign(string_to_sign)
      param['Signature'] = signature
      url = '?' + self._pop_standard_urlencode(urllib.parse.urlencode(param))
      return url


class AscRequestParam(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
      self.signer = AscSigner(AccessKeyId, AccessKeySecret)

    # 获取域名列表
    def get_domains_params(self):
      action_param = {
            'Action': 'DescribeDomains',
            'PageSize': '100',
      }
      return self.signer.get_param_and_sign(action_param)

    # 获取解析记录
    def get_domain_records_params(self, domain_name):
      action_param = {
            'Action': 'DescribeDomainRecords',
            'DomainName': domain_name,
            'PageSize': '100',
      }
      return self.signer.get_param_and_sign(action_param)

    # 修改解析记录
    def update_domain_record_params(self, data):
      action_param = {
            'Action': 'UpdateDomainRecord',
            'RR': data['RR'],
            'RecordId': data['RecordId'],
            'Type': data['Type'],
            'Value': data['Value'],
      }
      return self.signer.get_param_and_sign(action_param)

    # 获取解析记录信息
    def get_domain_record_info_params(self, RecordId):
      action_param = {
            'Action': 'DescribeDomainRecordInfo',
            'RecordId': RecordId,
      }
      return self.signer.get_param_and_sign(action_param)


class AscClient(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
      self.ip = PublicIP()
      self.client = AscRequestParam(AccessKeyId, AccessKeySecret)
      self.api = 'https://alidns.aliyuncs.com/'

    def _request(self, params):
      res = requests.get(self.api + params)
      return json.loads(res.content.decode('utf-8'))

    def _wait(self):
      timecount = TIME_INTERVAL
      while timecount >= 0:
            print(f"\r再次检测倒计时:{timecount}秒...", end="", flush=True)
            timecount = timecount - 1
            time.sleep(1)
      print('\n\n', end="", flush=True)

    def update_domain_record(self, record):
      ip = self.ip.ip_query()
      print(f'公网IP:{ip},')
      if ip == record['Value']:
            print('公网IP和解析IP相同,无需更新')
            return
      record['Value'] = ip
      params = self.client.update_domain_record_params(record)
      self._request(params)
      print('解析完成')

    def get_domain_records(self, RR, domain_name):
      params = self.client.get_domain_records_params(domain_name)
      res = self._request(params)
      TotalCount = int(res['TotalCount'])
      for i in res['DomainRecords']['Record']:
            if i['RR'] == RR:
                return i
      return None

    def get_domains(self):
      params = self.client.get_domains_params()
      res = self._request(params)
      domain_list = []
      for i in res['Domains']['Domain']:
            domain_list.append(i['DomainName'])
      return int(res['TotalCount']), domain_list

    def start(self, RR, domain_name):
      while True:
            count, domains = self.get_domains()
            if count <= 0 or not domain_name in domains:
                print('没有此域名')
                return
            record = self.get_domain_records(RR, domain_name)
            if not record:
                print(f'没有找到{RR}的解析记录')
                return
            if not record['Type'] == 'A':
                print('暂时只支持解析A记录')
                return
            if record['Status'] == 'DISABLE':
                print('此记录已被禁用,请启用后再试')
                return
            self.update_domain_record(record)
            self._wait()


if __name__ == '__main__':
    # 换成你自己的AccessKeyID, AccessKeySecret
    a = AscClient('**********', '**********************')

    # 如果需要解析 www.baidu.com,填入 www 和 baidu.com
    # 如果需要解析 baidu.com填入 @ 和 baidu.com
    a.start('test', 'mydomain.net')

qq1151985918 发表于 2021-1-18 16:14:11

66666666666666666666666
页: [1]
查看完整版本: 【转载】阿里云DDNS动态域名解析,无需SDK