[twitter]新增用户信息采集功能

This commit is contained in:
DELL 2026-01-21 17:52:17 +08:00
parent bf91c06801
commit 8631b0febf
6 changed files with 377 additions and 3 deletions

View File

@ -170,3 +170,28 @@ class TelegramMember(scrapy.Item):
role = scrapy.Field() # 成员角色c-创建者、a-管理员、u-普通成员
mobile = scrapy.Field() # 成员手机号
profile_photo = scrapy.Field() # 头像 blob
class TwitterUserInfoItem(scrapy.Item):
"""对应数据库表 twitter_user_info 的 Scrapy Item"""
crawl_time = scrapy.Field() # DATETIME - 数据爬取时间
is_newest = scrapy.Field() # TINYINT(1) - 是否最新
platform_type = scrapy.Field() # VARCHAR(20) - 平台类型
user_id = scrapy.Field() # BIGINT UNSIGNED - Twitter 用户唯一ID
username = scrapy.Field() # VARCHAR(50) - 用户名(@后部分)
nickname = scrapy.Field() # VARCHAR(100) - 显示名称
user_url = scrapy.Field() # VARCHAR(255) - 主页URL
avatar_url = scrapy.Field() # VARCHAR(500) - 头像原始URL
avatar_path = scrapy.Field() # VARCHAR(255) - 本地头像路径
intro = scrapy.Field() # TEXT - 简介
city = scrapy.Field() # VARCHAR(100) - 城市
join_date = scrapy.Field() # DATETIME - 加入时间
post_count = scrapy.Field() # INT UNSIGNED - 推文数
is_verified = scrapy.Field() # VARCHAR(10) - 是否认证 ("True"/"False")
follow_count = scrapy.Field() # INT UNSIGNED - 关注人数
fans_count = scrapy.Field() # INT UNSIGNED - 粉丝数
image_urls = scrapy.Field()

View File

@ -7,6 +7,9 @@ import logging
import os
import tarfile
import time
from scrapy.exceptions import DropItem
import uuid
# Define your item pipelines here
#
# Don't forget to add your pipeline to the ITEM_PIPELINES setting
@ -265,3 +268,174 @@ class TelegramDataSaveToMySQL(object):
except pymysql.err.DataError as de:
logging.error(repr(de))
return item
class TwitterUserDataSaveToMySQL(object):
def __init__(self):
self.db = None
self.cursor = None
def open_spider(self, spider):
self.db = pymysql.connect(host='47.113.231.200', port=28089, user='root', passwd='passok123A',
db='dsp', charset='utf8mb4')
self.cursor = self.db.cursor()
def close_spider(self, spider):
if self.cursor:
self.cursor.close()
if self.db:
self.db.close()
def process_item(self, item, spider):
# 可选:只处理特定 Item
if item.__class__.__name__ != 'TwitterUserInfoItem':
return item
self.table_name = "twitter_user_info"
value = item.get('avatar_path')
# 处理 avatar_path
if isinstance(value, list) and len(value) > 0:
value = value[0].get('path', '') if isinstance(value[0], dict) else str(value[0])
elif isinstance(value, dict):
value = value.get('path', '')
else:
value = str(value) if value else ''
item['avatar_path'] = value
try:
user_id = item.get('user_id')
if not user_id:
logging.warning("缺少 user_id跳过处理。")
return item
# 1. 基于 user_id 生成稳定 UUID命名空间 + 字符串)
stable_uuid = str(uuid.uuid5(uuid.NAMESPACE_DNS, f"twitter_user_{user_id}"))
# 2. 查询数据库是否已存在
existing = self._select_by_uuid(stable_uuid)
if existing:
# 3. 比较字段,判断是否需要更新
if self._needs_update(existing, item):
# 4. 执行更新
self._update_item(stable_uuid, item)
logging.info(f"用户 {user_id} 数据已更新。")
else:
logging.debug(f"用户 {user_id} 数据无变化,跳过更新。")
else:
# 5. 插入新记录
self._insert_item_with_uuid(stable_uuid, item)
logging.info(f"用户 {user_id} 新数据已插入。")
except Exception as e:
spider.logger.error(f"处理用户数据失败 (user_id={item.get('user_id')}): {e}")
raise DropItem(f"Database error: {e}")
return item
def _select_by_uuid(self, record_uuid):
"""根据 UUID 查询整行数据"""
sql = f"SELECT * FROM dsp.{self.table_name} WHERE id = %s"
self.cursor.execute(sql, (record_uuid,))
row = self.cursor.fetchone()
if row:
# 获取列名
columns = [desc[0] for desc in self.cursor.description]
return dict(zip(columns, row))
return None
def _needs_update(self, db_record, item):
"""比较数据库记录与 item 是否有差异"""
for field in item.fields:
if field in ['id', 'created_at', 'updated_at', 'image_urls']:
continue
item_val = item.get(field)
db_val = db_record.get(field)
# 标准化空值None 和 '' 视为等价
if item_val is None or item_val == '':
item_val = None
if db_val is None or db_val == '':
db_val = None
if item_val != db_val:
return True
return False
def _update_item(self, record_uuid, item):
"""更新不一致的字段 + updated_at"""
update_fields = []
update_vals = []
for field in item.fields:
if field in ['id', 'created_at', 'updated_at', 'image_urls']:
continue
value = item.get(field)
if value is None or value == '':
continue
update_fields.append(f"{field} = %s")
update_vals.append(value)
if not update_fields:
return
update_vals.append(record_uuid) # WHERE id = %s
sql = f"UPDATE dsp.{self.table_name} SET {', '.join(update_fields)}, updated_at = CURRENT_TIMESTAMP WHERE id = %s"
self.cursor.execute(sql, update_vals)
self.db.commit()
def _insert_item_with_uuid(self, record_uuid, item):
"""插入新记录,指定 id 为 stable_uuid"""
cols = ['id']
vals = [record_uuid]
for field in item.fields:
if field in ['image_urls', 'id']:
continue
# 获取字段值
value = item.get(field)
# 处理 avatar_path兼容 dict / list / str
if field == 'avatar_path':
if isinstance(value, list) and len(value) > 0:
value = value[0].get('path', '') if isinstance(value[0], dict) else str(value[0])
elif isinstance(value, dict):
value = value.get('path', '')
else:
value = str(value) if value else ''
# 跳过 None 和空字符串
if value is None or value == '':
continue
cols.append(field)
vals.append(value)
if not cols:
logging.warning("没有有效的字段可供插入。")
return
placeholders = ', '.join(['%s'] * len(cols))
cols_str = ', '.join(cols)
sql = f"INSERT INTO dsp.twitter_user_info ({cols_str}) VALUES ({placeholders})"
try:
self.cursor.execute(sql, vals)
self.db.commit()
except pymysql.err.IntegrityError as ie:
self.db.rollback()
logging.debug(f"数据重复,已跳过插入:{ie}")
except pymysql.err.DataError as de:
self.db.rollback()
logging.error(f"数据格式错误(如字段超长、类型不匹配等):{de}")
raise
except Exception as e:
self.db.rollback()
logging.error(f"数据库操作发生未知错误:{e}")
raise

View File

@ -2,7 +2,7 @@
BOT_NAME = 'MediaSpiders'
LOG_LEVEL = 'DEBUG'
LOG_LEVEL = 'INFO'
SPIDER_MODULES = ['MediaSpiders.spiders']
NEWSPIDER_MODULE = 'MediaSpiders.spiders'
@ -34,6 +34,12 @@ MYSQL_DB_USER = 'root'
MYSQL_DB_PASSWD = 'passok123A'
MYSQL_DB_SCHEMA = 'oscm'
TWITTER_USER_MYSQL_DB_HOST = '47.113.231.200'
TWITTER_USER_MYSQL_DB_PORT = 28089
TWITTER_USER_MYSQL_DB_USER = 'root'
TWITTER_USER_MYSQL_DB_PASSWD = 'passok123A'
TWITTER_USER_MYSQL_DB_SCHEMA = 'dsp'
CRAWL_JOB_UPDATE_API = 'http://47.115.228.133:28081/api/open/crawljob'
WORD_BANK_QUERY_API = 'http://47.115.228.133:28081/api/open/wordBank/queryAll'
RULES_PARSER_QUERY_API = 'http://47.115.228.133:28081/api/rules/parser/queryPageable/0/1'

View File

@ -0,0 +1,169 @@
# -*- coding: utf-8 -*-
from datetime import datetime
from datetime import timezone, timedelta
import json
import logging as logger
import random
import re
import time
from urllib import parse
import redis
import scrapy
from scrapy_selenium import SeleniumRequest
from MediaSpiders.items import MediaspidersItem, TwitterUserInfoItem
from MediaSpiders.utils.http_utils import http_post
from MediaSpiders.utils.login_utils import login
from MediaSpiders.utils.time_utils import get_time_stamp, get_current_timestamp
def form_cookie_dict(cookie_string):
cookie_string_list = cookie_string.split(';')
cookie_dict = {}
for cookie in cookie_string_list:
key = cookie.split('=')[0].replace(' ', '')
cookie_dict[key] = cookie.split('=')[1]
return cookie_dict
class TwitterSpider(scrapy.Spider):
name = 'TwitterUserInfoSpider'
custom_settings = {
'PROTO_SAVE_FILE_NAME': 'public_twitter_user_info_data_',
'IMAGES_STORE': r'/usr/local/temp_image/twitteruserinfo',
'IMAGES_RESULT_FIELD': 'avatar_path',
'ITEM_PIPELINES': {
'scrapy.pipelines.images.ImagesPipeline': 2,
'MediaSpiders.pipelines.TwitterUserDataSaveToMySQL': 300,
},
'SPIDER_MIDDLEWARES': {
'MediaSpiders.middlewares.DumpFilterSpiderMiddleware': 543,
'scrapy.spidermiddlewares.urllength.UrlLengthMiddleware': None
}
}
def __init__(self, params=None, *args, **kwargs):
super(TwitterSpider, self).__init__(*args, **kwargs)
self.total_num = 100
self.authorization = 'Bearer AAAAAAAAAAAAAAAAAAAAANRILgAAAAAAnNwIzUejRCOuH5E6I8xnZz4puTs%3D1Zv7ttfk8LF81IUq16cHjhLTvJu4FA33AGWWjCpTnA'
if params:
json_params = json.loads(params)
if 'totalNum' in json_params:
self.total_num = int(json_params['totalNum'])
if 'authorization' in json_params:
self.authorization = json_params['authorization']
if 'job_id' in json_params:
self.job_id = json_params['job_id']
def start_requests(self):
yield SeleniumRequest(url='https://www.google.com/', callback=self.login_twitter)
def login_twitter(self, response):
self.redis_client = redis.Redis(host=self.settings['REDIS_HOST'], port=self.settings['REDIS_PORT'],
password=self.settings['REDIS_PWD'])
self.simhash_filter_key = self.settings['TWITTER_SIMHASH_FILTER_KEY']
cookie_string = None
# 获取采集登录账号并登录
login_users = self.redis_client.smembers('MediaSpiders:Twitter_login_accounts')
# 尝试自动化登录网页获取 cookies若失败则从redis中 使用已有cookies
# try:
#
# driver = login().login_with_selenium(
# 'https://x.com/i/flow/login',
# self.name,
# login_users=login_users,
# response=response
# )
# cookies = driver.get_cookies()
# # 取cookie中的ct0为x-csrf-token取gt为x-guest-token
# self.cookie_dict = {}
# for cookie in cookies:
# self.cookie_dict[cookie['name']] = cookie['value']
# except Exception as e:
# logger.info("自动化获取cookies失败")
cookie_string = self.redis_client.get("MediaSpiders:Twitter_Cookies").decode()
self.cookie_dict = form_cookie_dict(cookie_string)
ct0 = self.cookie_dict.get('ct0')
if not ct0:
logger.error("redis中cookie缺失ct0 (CSRF token)")
return
self.header = {
'Host': 'api.twitter.com',
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/143.0.0.0 Safari/537.36 Edg/143.0.0.0',
'Accept': '*/*',
'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6',
'content-type': 'application/json',
'authorization': self.authorization,
'Origin': 'https://twitter.com',
'Cookie': cookie_string,
'X-Csrf-Token': ct0
}
self.filter_key = self.settings['TWITTER_FILTER_KEY']
self.pid_key = self.settings['TWITTER_PID_KEY']
url_key = self.redis_client.get("MediaSpiders:Twitter_URL_Key").decode()
account_query_api = self.settings['SOCIAL_USER_QUERY_ALL_API']
account_query_api = account_query_api.format(sortBy="id", shuffleResult="true")
post_data = {
'userType': self.settings['TWITTER_USER_TYPE'],
'userFlag': 0
}
account_rsp = json.loads(
http_post(account_query_api, json.dumps(post_data), headers={"Content-Type": "application/json"}).text)
all_user_info = []
if account_rsp['code'] == 200:
all_user_info = account_rsp['content']
for user_info in all_user_info:
graphql_url = f'https://x.com/i/api/graphql/-oaLodhGbbnzJBACb1kk2Q/UserByScreenName?variables=%7B%22screen_name%22%3A%22{user_info["userName"]}%22%2C%22withGrokTranslatedBio%22%3Afalse%7D&features=%7B%22hidden_profile_subscriptions_enabled%22%3Atrue%2C%22profile_label_improvements_pcf_label_in_post_enabled%22%3Atrue%2C%22responsive_web_profile_redirect_enabled%22%3Afalse%2C%22rweb_tipjar_consumption_enabled%22%3Atrue%2C%22verified_phone_label_enabled%22%3Afalse%2C%22subscriptions_verification_info_is_identity_verified_enabled%22%3Atrue%2C%22subscriptions_verification_info_verified_since_enabled%22%3Atrue%2C%22highlights_tweets_tab_ui_enabled%22%3Atrue%2C%22responsive_web_twitter_article_notes_tab_enabled%22%3Atrue%2C%22subscriptions_feature_can_gift_premium%22%3Atrue%2C%22creator_subscriptions_tweet_preview_api_enabled%22%3Atrue%2C%22responsive_web_graphql_skip_user_profile_image_extensions_enabled%22%3Afalse%2C%22responsive_web_graphql_timeline_navigation_enabled%22%3Atrue%7D&fieldToggles=%7B%22withPayments%22%3Afalse%2C%22withAuxiliaryUserLabels%22%3Atrue%7D'
yield scrapy.Request(url=graphql_url, callback=self.parse,
meta={
'uid': user_info['userUid'],
'uname': user_info['userName'],
'proxy': 'http://127.0.0.1:10809',
'currentCount': 0
},
cookies=self.cookie_dict, headers=self.header)
def parse(self, response):
uid = response.request.meta['uid']
uname = response.request.meta['uname']
current_count = response.request.meta['currentCount']
if current_count > 0:
self.logger.info("翻页采集:第%s" % int(current_count / 20 + 1))
else:
self.logger.info("首页采集")
try:
rsp = json.loads(response.text)
entries = []
instructions = rsp['data']['user']['result']
item = TwitterUserInfoItem()
item['crawl_time'] = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))
item['is_newest'] = 1
item['platform_type'] = "Twitter"
item['user_id'] = instructions['rest_id']
item['username'] = instructions['core']['name']
item['nickname'] = instructions['core']['screen_name']
item['user_url'] = f'https://x.com/{uname}'
item['avatar_url'] = instructions['avatar']['image_url']
item['image_urls'] = [instructions['avatar']['image_url']]
item['intro'] = instructions['legacy']['description']
item['city'] = 'ceshi' # instructions.get('legacy', {}).get('location', {}).get('location', '').strip()
try:
# 转换为 datetime 对象
ts = get_time_stamp(
str(instructions['core']['created_at'])) + 8 * 3600 * 1000
dt = datetime.fromtimestamp(ts / 1000, tz=timezone.utc)
item['join_date'] = dt.strftime('%Y-%m-%d %H:%M:%S') # '2012-06-28 12:25:01'
except (ValueError, KeyError) as e:
item['join_date'] = None # 或记录日志
logger.error('时间转换失败:' + e)
item['post_count'] = instructions['legacy']['statuses_count']
item['follow_count'] = instructions['legacy']['friends_count']
item['fans_count'] = instructions['legacy']['followers_count']
item['is_verified'] = str(instructions['is_blue_verified'])
verified_type = instructions.get('verification', {}).get('verified_type', None) # 认证类型
yield item
except:
self.logger.error("解析response错误")

View File

@ -42,7 +42,7 @@ class login:
if self.name == 'FacebookUserSpider':
self.facebook_login(driver)
elif self.name == 'TwitterUserSpider':
elif self.name == 'TwitterUserSpider' or self.name == 'TwitterUserInfoSpider' :
self.twitter_login(driver)
elif self.name == 'wechat_links_fetcher':
self.wechat_links_login(driver)

View File

@ -19,4 +19,4 @@ dirpath = os.path.dirname(os.path.abspath(__file__))
sys.path.append(dirpath)
# 等效于scrapy crawl FacebookUserSpider -a params="{}"
execute(['scrapy', 'crawl', 'WeiboUserSpider', '-a', 'params={}'])
execute(['scrapy', 'crawl', 'TwitterUserInfoSpider', '-a', 'params={}'])