From 8631b0febffb4f1087fad7c87fd53fe1676141b5 Mon Sep 17 00:00:00 2001 From: DELL Date: Wed, 21 Jan 2026 17:52:17 +0800 Subject: [PATCH] =?UTF-8?q?[twitter]=E6=96=B0=E5=A2=9E=E7=94=A8=E6=88=B7?= =?UTF-8?q?=E4=BF=A1=E6=81=AF=E9=87=87=E9=9B=86=E5=8A=9F=E8=83=BD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- spiders/MediaSpiders/MediaSpiders/items.py | 25 +++ .../MediaSpiders/MediaSpiders/pipelines.py | 174 ++++++++++++++++++ spiders/MediaSpiders/MediaSpiders/settings.py | 8 +- .../spiders/TwitterUserInfoSpider.py | 169 +++++++++++++++++ .../MediaSpiders/utils/login_utils.py | 2 +- spiders/MediaSpiders/run.py | 2 +- 6 files changed, 377 insertions(+), 3 deletions(-) create mode 100644 spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserInfoSpider.py diff --git a/spiders/MediaSpiders/MediaSpiders/items.py b/spiders/MediaSpiders/MediaSpiders/items.py index 1d9a0fb..2c8c453 100644 --- a/spiders/MediaSpiders/MediaSpiders/items.py +++ b/spiders/MediaSpiders/MediaSpiders/items.py @@ -170,3 +170,28 @@ class TelegramMember(scrapy.Item): role = scrapy.Field() # 成员角色:c-创建者、a-管理员、u-普通成员 mobile = scrapy.Field() # 成员手机号 profile_photo = scrapy.Field() # 头像 blob + + +class TwitterUserInfoItem(scrapy.Item): + """对应数据库表 twitter_user_info 的 Scrapy Item""" + + crawl_time = scrapy.Field() # DATETIME - 数据爬取时间 + is_newest = scrapy.Field() # TINYINT(1) - 是否最新 + platform_type = scrapy.Field() # VARCHAR(20) - 平台类型 + + user_id = scrapy.Field() # BIGINT UNSIGNED - Twitter 用户唯一ID + username = scrapy.Field() # VARCHAR(50) - 用户名(@后部分) + nickname = scrapy.Field() # VARCHAR(100) - 显示名称 + user_url = scrapy.Field() # VARCHAR(255) - 主页URL + avatar_url = scrapy.Field() # VARCHAR(500) - 头像原始URL + avatar_path = scrapy.Field() # VARCHAR(255) - 本地头像路径 + + intro = scrapy.Field() # TEXT - 简介 + city = scrapy.Field() # VARCHAR(100) - 城市 + join_date = scrapy.Field() # DATETIME - 加入时间 + + post_count = scrapy.Field() # INT UNSIGNED - 推文数 + is_verified = scrapy.Field() # VARCHAR(10) - 是否认证 ("True"/"False") + follow_count = scrapy.Field() # INT UNSIGNED - 关注人数 + fans_count = scrapy.Field() # INT UNSIGNED - 粉丝数 + image_urls = scrapy.Field() diff --git a/spiders/MediaSpiders/MediaSpiders/pipelines.py b/spiders/MediaSpiders/MediaSpiders/pipelines.py index 412f4d2..259e33e 100644 --- a/spiders/MediaSpiders/MediaSpiders/pipelines.py +++ b/spiders/MediaSpiders/MediaSpiders/pipelines.py @@ -7,6 +7,9 @@ import logging import os import tarfile import time +from scrapy.exceptions import DropItem +import uuid + # Define your item pipelines here # # Don't forget to add your pipeline to the ITEM_PIPELINES setting @@ -265,3 +268,174 @@ class TelegramDataSaveToMySQL(object): except pymysql.err.DataError as de: logging.error(repr(de)) return item + +class TwitterUserDataSaveToMySQL(object): + def __init__(self): + self.db = None + self.cursor = None + + def open_spider(self, spider): + self.db = pymysql.connect(host='47.113.231.200', port=28089, user='root', passwd='passok123A', + db='dsp', charset='utf8mb4') + self.cursor = self.db.cursor() + + def close_spider(self, spider): + if self.cursor: + self.cursor.close() + if self.db: + self.db.close() + + def process_item(self, item, spider): + # 可选:只处理特定 Item + if item.__class__.__name__ != 'TwitterUserInfoItem': + return item + self.table_name = "twitter_user_info" + + value = item.get('avatar_path') + + # 处理 avatar_path + if isinstance(value, list) and len(value) > 0: + value = value[0].get('path', '') if isinstance(value[0], dict) else str(value[0]) + elif isinstance(value, dict): + value = value.get('path', '') + else: + value = str(value) if value else '' + + item['avatar_path'] = value + try: + user_id = item.get('user_id') + if not user_id: + logging.warning("缺少 user_id,跳过处理。") + return item + + # 1. 基于 user_id 生成稳定 UUID(命名空间 + 字符串) + stable_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"twitter_user_{user_id}")) + + # 2. 查询数据库是否已存在 + existing = self._select_by_uuid(stable_uuid) + + if existing: + # 3. 比较字段,判断是否需要更新 + if self._needs_update(existing, item): + # 4. 执行更新 + self._update_item(stable_uuid, item) + logging.info(f"用户 {user_id} 数据已更新。") + else: + logging.debug(f"用户 {user_id} 数据无变化,跳过更新。") + else: + # 5. 插入新记录 + self._insert_item_with_uuid(stable_uuid, item) + logging.info(f"用户 {user_id} 新数据已插入。") + + except Exception as e: + spider.logger.error(f"处理用户数据失败 (user_id={item.get('user_id')}): {e}") + raise DropItem(f"Database error: {e}") + + return item + + def _select_by_uuid(self, record_uuid): + """根据 UUID 查询整行数据""" + sql = f"SELECT * FROM dsp.{self.table_name} WHERE id = %s" + self.cursor.execute(sql, (record_uuid,)) + row = self.cursor.fetchone() + if row: + # 获取列名 + columns = [desc[0] for desc in self.cursor.description] + return dict(zip(columns, row)) + return None + + def _needs_update(self, db_record, item): + """比较数据库记录与 item 是否有差异""" + for field in item.fields: + if field in ['id', 'created_at', 'updated_at', 'image_urls']: + continue + + item_val = item.get(field) + db_val = db_record.get(field) + + # 标准化空值:None 和 '' 视为等价 + if item_val is None or item_val == '': + item_val = None + if db_val is None or db_val == '': + db_val = None + + if item_val != db_val: + return True + return False + + + def _update_item(self, record_uuid, item): + """更新不一致的字段 + updated_at""" + update_fields = [] + update_vals = [] + + for field in item.fields: + if field in ['id', 'created_at', 'updated_at', 'image_urls']: + continue + value = item.get(field) + + if value is None or value == '': + continue + + update_fields.append(f"{field} = %s") + update_vals.append(value) + + if not update_fields: + return + + update_vals.append(record_uuid) # WHERE id = %s + sql = f"UPDATE dsp.{self.table_name} SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s" + self.cursor.execute(sql, update_vals) + self.db.commit() + + def _insert_item_with_uuid(self, record_uuid, item): + """插入新记录,指定 id 为 stable_uuid""" + cols = ['id'] + vals = [record_uuid] + + for field in item.fields: + if field in ['image_urls', 'id']: + continue + + # 获取字段值 + value = item.get(field) + + # 处理 avatar_path:兼容 dict / list / str + if field == 'avatar_path': + if isinstance(value, list) and len(value) > 0: + value = value[0].get('path', '') if isinstance(value[0], dict) else str(value[0]) + elif isinstance(value, dict): + value = value.get('path', '') + else: + value = str(value) if value else '' + + # 跳过 None 和空字符串 + if value is None or value == '': + continue + + cols.append(field) + vals.append(value) + + if not cols: + logging.warning("没有有效的字段可供插入。") + return + + placeholders = ', '.join(['%s'] * len(cols)) + cols_str = ', '.join(cols) + sql = f"INSERT INTO dsp.twitter_user_info ({cols_str}) VALUES ({placeholders})" + + try: + self.cursor.execute(sql, vals) + self.db.commit() + except pymysql.err.IntegrityError as ie: + self.db.rollback() + logging.debug(f"数据重复,已跳过插入:{ie}") + except pymysql.err.DataError as de: + self.db.rollback() + logging.error(f"数据格式错误(如字段超长、类型不匹配等):{de}") + raise + except Exception as e: + self.db.rollback() + logging.error(f"数据库操作发生未知错误:{e}") + raise + diff --git a/spiders/MediaSpiders/MediaSpiders/settings.py b/spiders/MediaSpiders/MediaSpiders/settings.py index d7abbd1..1692ee0 100644 --- a/spiders/MediaSpiders/MediaSpiders/settings.py +++ b/spiders/MediaSpiders/MediaSpiders/settings.py @@ -2,7 +2,7 @@ BOT_NAME = 'MediaSpiders' -LOG_LEVEL = 'DEBUG' +LOG_LEVEL = 'INFO' SPIDER_MODULES = ['MediaSpiders.spiders'] NEWSPIDER_MODULE = 'MediaSpiders.spiders' @@ -34,6 +34,12 @@ MYSQL_DB_USER = 'root' MYSQL_DB_PASSWD = 'passok123A' MYSQL_DB_SCHEMA = 'oscm' +TWITTER_USER_MYSQL_DB_HOST = '47.113.231.200' +TWITTER_USER_MYSQL_DB_PORT = 28089 +TWITTER_USER_MYSQL_DB_USER = 'root' +TWITTER_USER_MYSQL_DB_PASSWD = 'passok123A' +TWITTER_USER_MYSQL_DB_SCHEMA = 'dsp' + CRAWL_JOB_UPDATE_API = 'http://47.115.228.133:28081/api/open/crawljob' WORD_BANK_QUERY_API = 'http://47.115.228.133:28081/api/open/wordBank/queryAll' RULES_PARSER_QUERY_API = 'http://47.115.228.133:28081/api/rules/parser/queryPageable/0/1' diff --git a/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserInfoSpider.py b/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserInfoSpider.py new file mode 100644 index 0000000..a9c6b3d --- /dev/null +++ b/spiders/MediaSpiders/MediaSpiders/spiders/TwitterUserInfoSpider.py @@ -0,0 +1,169 @@ +# -*- coding: utf-8 -*- +from datetime import datetime +from datetime import timezone, timedelta +import json +import logging as logger +import random +import re +import time +from urllib import parse + +import redis +import scrapy +from scrapy_selenium import SeleniumRequest +from MediaSpiders.items import MediaspidersItem, TwitterUserInfoItem +from MediaSpiders.utils.http_utils import http_post +from MediaSpiders.utils.login_utils import login +from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp + + +def form_cookie_dict(cookie_string): + cookie_string_list = cookie_string.split(';') + cookie_dict = {} + for cookie in cookie_string_list: + key = cookie.split('=')[0].replace(' ', '') + cookie_dict[key] = cookie.split('=')[1] + return cookie_dict + + +class TwitterSpider(scrapy.Spider): + name = 'TwitterUserInfoSpider' + custom_settings = { + + 'PROTO_SAVE_FILE_NAME': 'public_twitter_user_info_data_', + 'IMAGES_STORE': r'/usr/local/temp_image/twitteruserinfo', + 'IMAGES_RESULT_FIELD': 'avatar_path', + 'ITEM_PIPELINES': { + 'scrapy.pipelines.images.ImagesPipeline': 2, + 'MediaSpiders.pipelines.TwitterUserDataSaveToMySQL': 300, + }, + 'SPIDER_MIDDLEWARES': { + 'MediaSpiders.middlewares.DumpFilterSpiderMiddleware': 543, + 'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': None + } + + } + + def __init__(self, params=None, *args, **kwargs): + super(TwitterSpider, self).__init__(*args, **kwargs) + self.total_num = 100 + self.authorization = 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA' + if params: + json_params = json.loads(params) + if 'totalNum' in json_params: + self.total_num = int(json_params['totalNum']) + if 'authorization' in json_params: + self.authorization = json_params['authorization'] + if 'job_id' in json_params: + self.job_id = json_params['job_id'] + + def start_requests(self): + yield SeleniumRequest(url='https://www.google.com/', callback=self.login_twitter) + + def login_twitter(self, response): + self.redis_client = redis.Redis(host=self.settings['REDIS_HOST'], port=self.settings['REDIS_PORT'], + password=self.settings['REDIS_PWD']) + self.simhash_filter_key = self.settings['TWITTER_SIMHASH_FILTER_KEY'] + cookie_string = None + # 获取采集登录账号并登录 + login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts') + # 尝试自动化登录网页获取 cookies,若失败则从redis中 使用已有cookies + # try: + # + # driver = login().login_with_selenium( + # 'https://x.com/i/flow/login', + # self.name, + # login_users=login_users, + # response=response + # ) + # cookies = driver.get_cookies() + # # 取cookie中的ct0为x-csrf-token,取gt为x-guest-token + # self.cookie_dict = {} + # for cookie in cookies: + # self.cookie_dict[cookie['name']] = cookie['value'] + # except Exception as e: + # logger.info("自动化获取cookies失败") + cookie_string = self.redis_client.get("MediaSpiders:Twitter_Cookies").decode() + self.cookie_dict = form_cookie_dict(cookie_string) + + ct0 = self.cookie_dict.get('ct0') + if not ct0: + logger.error("redis中cookie缺失ct0 (CSRF token)!") + return + self.header = { + 'Host': 'api.twitter.com', + 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0', + 'Accept': '*/*', + 'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6', + 'content-type': 'application/json', + 'authorization': self.authorization, + 'Origin': 'https://twitter.com', + 'Cookie': cookie_string, + 'X-Csrf-Token': ct0 + } + self.filter_key = self.settings['TWITTER_FILTER_KEY'] + self.pid_key = self.settings['TWITTER_PID_KEY'] + url_key = self.redis_client.get("MediaSpiders:Twitter_URL_Key").decode() + account_query_api = self.settings['SOCIAL_USER_QUERY_ALL_API'] + account_query_api = account_query_api.format(sortBy="id", shuffleResult="true") + post_data = { + 'userType': self.settings['TWITTER_USER_TYPE'], + 'userFlag': 0 + } + account_rsp = json.loads( + http_post(account_query_api, json.dumps(post_data), headers={"Content-Type": "application/json"}).text) + all_user_info = [] + if account_rsp['code'] == 200: + all_user_info = account_rsp['content'] + for user_info in all_user_info: + graphql_url = f'https://x.com/i/api/graphql/-oaLodhGbbnzJBACb1kk2Q/UserByScreenName?variables=%7B%22screen_name%22%3A%22{user_info["userName"]}%22%2C%22withGrokTranslatedBio%22%3Afalse%7D&features=%7B%22hidden_profile_subscriptions_enabled%22%3Atrue%2C%22profile_label_improvements_pcf_label_in_post_enabled%22%3Atrue%2C%22responsive_web_profile_redirect_enabled%22%3Afalse%2C%22rweb_tipjar_consumption_enabled%22%3Atrue%2C%22verified_phone_label_enabled%22%3Afalse%2C%22subscriptions_verification_info_is_identity_verified_enabled%22%3Atrue%2C%22subscriptions_verification_info_verified_since_enabled%22%3Atrue%2C%22highlights_tweets_tab_ui_enabled%22%3Atrue%2C%22responsive_web_twitter_article_notes_tab_enabled%22%3Atrue%2C%22subscriptions_feature_can_gift_premium%22%3Atrue%2C%22creator_subscriptions_tweet_preview_api_enabled%22%3Atrue%2C%22responsive_web_graphql_skip_user_profile_image_extensions_enabled%22%3Afalse%2C%22responsive_web_graphql_timeline_navigation_enabled%22%3Atrue%7D&fieldToggles=%7B%22withPayments%22%3Afalse%2C%22withAuxiliaryUserLabels%22%3Atrue%7D' + yield scrapy.Request(url=graphql_url, callback=self.parse, + meta={ + 'uid': user_info['userUid'], + 'uname': user_info['userName'], + 'proxy': 'http://127.0.0.1:10809', + 'currentCount': 0 + }, + cookies=self.cookie_dict, headers=self.header) + + def parse(self, response): + uid = response.request.meta['uid'] + uname = response.request.meta['uname'] + current_count = response.request.meta['currentCount'] + if current_count > 0: + self.logger.info("翻页采集:第%s页" % int(current_count / 20 + 1)) + else: + self.logger.info("首页采集") + try: + rsp = json.loads(response.text) + entries = [] + instructions = rsp['data']['user']['result'] + item = TwitterUserInfoItem() + item['crawl_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time())) + item['is_newest'] = 1 + item['platform_type'] = "Twitter" + item['user_id'] = instructions['rest_id'] + item['username'] = instructions['core']['name'] + item['nickname'] = instructions['core']['screen_name'] + item['user_url'] = f'https://x.com/{uname}' + item['avatar_url'] = instructions['avatar']['image_url'] + item['image_urls'] = [instructions['avatar']['image_url']] + item['intro'] = instructions['legacy']['description'] + item['city'] = 'ceshi' # instructions.get('legacy', {}).get('location', {}).get('location', '').strip() + try: + # 转换为 datetime 对象 + ts = get_time_stamp( + str(instructions['core']['created_at'])) + 8 * 3600 * 1000 + dt = datetime.fromtimestamp(ts / 1000, tz=timezone.utc) + item['join_date'] = dt.strftime('%Y-%m-%d %H:%M:%S') # '2012-06-28 12:25:01' + except (ValueError, KeyError) as e: + item['join_date'] = None # 或记录日志 + logger.error('时间转换失败:' + e) + item['post_count'] = instructions['legacy']['statuses_count'] + item['follow_count'] = instructions['legacy']['friends_count'] + item['fans_count'] = instructions['legacy']['followers_count'] + item['is_verified'] = str(instructions['is_blue_verified']) + verified_type = instructions.get('verification', {}).get('verified_type', None) # 认证类型 + yield item + except: + self.logger.error("解析response错误") diff --git a/spiders/MediaSpiders/MediaSpiders/utils/login_utils.py b/spiders/MediaSpiders/MediaSpiders/utils/login_utils.py index 4ba6e80..932bdc0 100644 --- a/spiders/MediaSpiders/MediaSpiders/utils/login_utils.py +++ b/spiders/MediaSpiders/MediaSpiders/utils/login_utils.py @@ -42,7 +42,7 @@ class login: if self.name == 'FacebookUserSpider': self.facebook_login(driver) - elif self.name == 'TwitterUserSpider': + elif self.name == 'TwitterUserSpider' or self.name == 'TwitterUserInfoSpider' : self.twitter_login(driver) elif self.name == 'wechat_links_fetcher': self.wechat_links_login(driver) diff --git a/spiders/MediaSpiders/run.py b/spiders/MediaSpiders/run.py index 89519af..217d3b9 100644 --- a/spiders/MediaSpiders/run.py +++ b/spiders/MediaSpiders/run.py @@ -19,4 +19,4 @@ dirpath = os.path.dirname(os.path.abspath(__file__)) sys.path.append(dirpath) # 等效于:scrapy crawl FacebookUserSpider -a params="{}" -execute(['scrapy', 'crawl', 'WeiboUserSpider', '-a', 'params={}']) +execute(['scrapy', 'crawl', 'TwitterUserInfoSpider', '-a', 'params={}'])