鱼C论坛

 找回密码
 立即注册
查看: 1751|回复: 1

[技术交流] 【转载】阿里云DDNS动态域名解析,无需SDK

[复制链接]
发表于 2021-1-18 16:11:43 | 显示全部楼层 |阅读模式

马上注册,结交更多好友,享用更多功能^_^

您需要 登录 才可以下载或查看,没有账号?立即注册

x
找了半天的阿里云解析,论坛也有一篇,但是缺太多代码了,实在不知道怎么改,这个直接填入自己的AccessKeyID, AccessKeySecret,还有要解析的域名就可以直接用,什么都不用改,转载一下,也作为自己一个收藏
代码转自52论坛的 https://www.52pojie.cn/forum.php?mod=viewthread&tid=1296738

import re, json, time, uuid, random, chardet
import hmac, base64, hashlib
import requests
import urllib.parse, urllib.request

# 轮询时间间隔
TIME_INTERVAL = 5


class PublicIP(object):
    def __init__(self):
        self.user_agent = ('Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) '
                           'Chrome/70.0.3538.25 Safari/537.36 Core/1.70.3776.400 QQBrowser/10.6.4212.400')
        self.api_list = [
            'https://202020.ip138.com/',
            'https://www.ip.cn/api/index?ip=&type=0',
            'http://ip.webmasterhome.cn/',
            'http://ip.yqie.com/clientip.aspx',
            'https://www.123cha.com/ip/',
            'http://ip.chacuo.net/',
        ]

    def ip_query(self):
        while True:
            url = random.sample(self.api_list, 1)[0]
            try:
                res = requests.get(url, headers={'User-Agent': self.user_agent}, timeout=5)
                encoding = chardet.detect(res.content)['encoding']
                html = res.content.decode(encoding)
                out = re.findall(r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}', html)
                if out != []: return out[0]
            except Exception as e:
                continue


class AscSigner(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
        self.AccessKeyId = AccessKeyId
        self.AccessKeySecret = AccessKeySecret

    def _utc(self):
        return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())

    def _uuid(self):
        return str(uuid.uuid4())

    def _pop_standard_urlencode(self, query):
        ret = query.replace('+', '%20')
        ret = ret.replace('*', '%2A')
        ret = ret.replace('%7E', '~')
        return ret

    def _common_param(self):
        return {
            'Format': 'JSON',
            'Version': '2015-01-09',
            'SignatureMethod': 'HMAC-SHA1',
            'SignatureNonce': self._uuid(),
            'SignatureVersion': '1.0',
            'AccessKeyId': self.AccessKeyId,
            'Timestamp': self._utc(),
        }

    def _string_to_sign(self, param):
        sorted_parameters = sorted(list(param.items()), key=lambda param: param[0])
        sorted_query_string = self._pop_standard_urlencode(urllib.parse.urlencode(sorted_parameters))
        canonicalized_query_string = self._pop_standard_urlencode(urllib.request.pathname2url(sorted_query_string))
        string_to_sign = "GET&%2F&" + canonicalized_query_string
        return string_to_sign

    def _sign(self, string_to_sign):
        param_encode = bytes(string_to_sign, 'utf-8')
        key = bytes(self.AccessKeySecret + '&', 'utf-8')
        h = hmac.new(key, param_encode, hashlib.sha1)
        return str(base64.encodebytes(h.digest()).strip(), "utf-8")

    def get_param_and_sign(self, action_param):
        common_param = self._common_param()
        param = dict(common_param, **action_param)
        string_to_sign = self._string_to_sign(param)
        signature = self._sign(string_to_sign)
        param['Signature'] = signature
        url = '?' + self._pop_standard_urlencode(urllib.parse.urlencode(param))
        return url


class AscRequestParam(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
        self.signer = AscSigner(AccessKeyId, AccessKeySecret)

    # 获取域名列表
    def get_domains_params(self):
        action_param = {
            'Action': 'DescribeDomains',
            'PageSize': '100',
        }
        return self.signer.get_param_and_sign(action_param)

    # 获取解析记录
    def get_domain_records_params(self, domain_name):
        action_param = {
            'Action': 'DescribeDomainRecords',
            'DomainName': domain_name,
            'PageSize': '100',
        }
        return self.signer.get_param_and_sign(action_param)

    # 修改解析记录
    def update_domain_record_params(self, data):
        action_param = {
            'Action': 'UpdateDomainRecord',
            'RR': data['RR'],
            'RecordId': data['RecordId'],
            'Type': data['Type'],
            'Value': data['Value'],
        }
        return self.signer.get_param_and_sign(action_param)

    # 获取解析记录信息
    def get_domain_record_info_params(self, RecordId):
        action_param = {
            'Action': 'DescribeDomainRecordInfo',
            'RecordId': RecordId,
        }
        return self.signer.get_param_and_sign(action_param)


class AscClient(object):
    def __init__(self, AccessKeyId, AccessKeySecret):
        self.ip = PublicIP()
        self.client = AscRequestParam(AccessKeyId, AccessKeySecret)
        self.api = 'https://alidns.aliyuncs.com/'

    def _request(self, params):
        res = requests.get(self.api + params)
        return json.loads(res.content.decode('utf-8'))

    def _wait(self):
        timecount = TIME_INTERVAL
        while timecount >= 0:
            print(f"\r再次检测倒计时:{timecount}秒...", end="", flush=True)
            timecount = timecount - 1
            time.sleep(1)
        print('\n\n', end="", flush=True)

    def update_domain_record(self, record):
        ip = self.ip.ip_query()
        print(f'公网IP:{ip},')
        if ip == record['Value']:
            print('公网IP和解析IP相同,无需更新')
            return
        record['Value'] = ip
        params = self.client.update_domain_record_params(record)
        self._request(params)
        print('解析完成')

    def get_domain_records(self, RR, domain_name):
        params = self.client.get_domain_records_params(domain_name)
        res = self._request(params)
        TotalCount = int(res['TotalCount'])
        for i in res['DomainRecords']['Record']:
            if i['RR'] == RR:
                return i
        return None

    def get_domains(self):
        params = self.client.get_domains_params()
        res = self._request(params)
        domain_list = []
        for i in res['Domains']['Domain']:
            domain_list.append(i['DomainName'])
        return int(res['TotalCount']), domain_list

    def start(self, RR, domain_name):
        while True:
            count, domains = self.get_domains()
            if count <= 0 or not domain_name in domains:
                print('没有此域名')
                return
            record = self.get_domain_records(RR, domain_name)
            if not record:
                print(f'没有找到{RR}的解析记录')
                return
            if not record['Type'] == 'A':
                print('暂时只支持解析A记录')
                return
            if record['Status'] == 'DISABLE':
                print('此记录已被禁用,请启用后再试')
                return
            self.update_domain_record(record)
            self._wait()


if __name__ == '__main__':
    # 换成你自己的AccessKeyID, AccessKeySecret
    a = AscClient('**********', '**********************')

    # 如果需要解析 www.baidu.com,填入 www 和 baidu.com
    # 如果需要解析 baidu.com  填入 @ 和 baidu.com
    a.start('test', 'mydomain.net')
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复

使用道具 举报

发表于 2021-1-18 16:14:11 | 显示全部楼层
66666666666666666666666
想知道小甲鱼最近在做啥?请访问 -> ilovefishc.com
回复 支持 反对

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

小黑屋|手机版|Archiver|鱼C工作室 ( 粤ICP备18085999号-1 | 粤公网安备 44051102000585号)

GMT+8, 2025-1-16 18:51

Powered by Discuz! X3.4

© 2001-2023 Discuz! Team.

快速回复 返回顶部 返回列表