1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124
| import urllib3 import datetime import time import requests import jmespath from pywchat import Sender
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
class BondsCalendar(object): """可转债日历类(集思录)
通过集思录中的投资日历模块, 获取本周和当天的可转债申购信息 """ HEADERS = { 'authority': 'www.jisilu.cn', 'accept': 'application/json, text/javascript, */*; q=0.01', 'accept-language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', 'referer': 'https://www.jisilu.cn/data/calendar/', 'sec-ch-ua': '"Chromium";v="110", "Not A(Brand";v="24", "Microsoft Edge";v="110"', 'sec-ch-ua-mobile': '?0', 'sec-ch-ua-platform': '"Windows"', 'sec-fetch-dest': 'empty', 'sec-fetch-mode': 'cors', 'sec-fetch-site': 'same-origin', 'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/110.0.0.0 Safari/537.36 Edg/110.0.1587.41', 'x-requested-with': 'XMLHttpRequest', }
def __init__(self): self.session = requests.session() self.session.headers = BondsCalendar.HEADERS self.domain = "https://www.jisilu.cn" self.day = datetime.date.today()
@staticmethod def get_calendar(): """ 获取今天的零时时间戳和24时的时间戳 :return: tuple: start_tamp, end_tamp """ dt = time.strftime("%Y-%m-%d") start_array = time.strptime(f"{dt} 00:00:00", "%Y-%m-%d %H:%M:%S") day_start_tamp = int(time.mktime(start_array))
end_array = time.strptime(f"{dt} 23:59:59", "%Y-%m-%d %H:%M:%S") day_end_tamp = int(time.mktime(end_array))
return day_start_tamp, day_end_tamp
def get_api(self, start_tamp, end_tamp): """ 获取某一时间范围内的数据 :param start_tamp: timetamp(10):开始时间 :param end_tamp: timetamp(10):结束时间 :return: json: response """
API = "/data/calendar/get_calendar_data/" params = { "qtype": "CNV", "start": start_tamp, "end": end_tamp, "_": int(time.time() * 1000) } try: rsp = self.session.get(self.domain + API, params=params, verify=False) rsp.raise_for_status() return rsp.json()
except Exception as e: print(e)
@staticmethod def handle_rsp(rsp): """ 获取response中含申购日的title信息并提取申购名称 :param rsp: json: json化的响应 :return: str: 处理好的消息 """ str_lists = jmespath.search("[?contains(title,'【申购日】') ==`true`].title", rsp) str_iterators = map(lambda x: x.lstrip("【申购日】"), str_lists) str_name = "\n".join(str_iterators) return str_name
def message_format(self, info): """ 消息组装格式化 :param info: str:需要组装的消息 :return: str: 发送的格式化消息 """ week = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期天"] template = f"今天({self.day}) {week[self.day.weekday()]}\n\n可转债申购列表:\n" return template + info
def send_msg(self, msg): data = { "msgtype": "text", "text": { "content": msg } }
requests.post("https://qyapi.weixin.qq.com/cgi-bin/webhook/send?key={key}", json=data, headers={"Content-Type": "application/json"})
def run(self): tamps = self.get_calendar() rsp = self.get_api(*tamps) bonds_names = self.handle_rsp(rsp)
if not bonds_names: return msg_info = self.message_format(bonds_names) self.send_msg(msg_info)
bond = BondsCalendar() bond.run()
|