osc/deploy/ProxyPool/Api/ProxyApi.py
2025-05-28 19:16:17 +08:00

151 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# -*- coding: utf-8 -*-
# !/usr/bin/env python
"""
-------------------------------------------------
File Name ProxyApi.py
Description : WebApi
Author : JHao
date 2016/12/4
-------------------------------------------------
Change Activity:
2016/12/04: WebApi
2019/08/14: 集成Gunicorn启动方式
-------------------------------------------------
"""
__author__ = 'JHao'
import json
import sys
import platform
import requests
from werkzeug.wrappers import Response
from flask import Flask, jsonify, request
sys.path.append('../')
from Config.ConfigGetter import config
from Manager.ProxyManager import ProxyManager
app = Flask(__name__)
class JsonResponse(Response):
@classmethod
def force_type(cls, response, environ=None):
if isinstance(response, (dict, list)):
response = jsonify(response)
return super(JsonResponse, cls).force_type(response, environ)
app.response_class = JsonResponse
api_list = {
'get': u'get an useful proxy',
# 'refresh': u'refresh proxy pool',
'get_all': u'get all proxy from proxy pool',
'delete?proxy=127.0.0.1:8080': u'delete an unable proxy',
'get_status': u'proxy number'
}
@app.route('/')
def index():
return api_list
@app.route('/get/')
def get():
proxy = ProxyManager().get()
return proxy.info_json if proxy else {"code": 0, "src": "no proxy"}
@app.route('/refresh/')
def refresh():
# TODO refresh会有守护程序定时执行由api直接调用性能较差暂不使用
# ProxyManager().refresh()
pass
return 'success'
@app.route('/get_all/')
def getAll():
proxies = ProxyManager().getAll()
return jsonify([_.info_dict for _ in proxies])
@app.route('/delete/', methods=['GET'])
def delete():
proxy = request.args.get('proxy')
ProxyManager().delete(proxy)
return {"code": 0, "src": "success"}
@app.route('/get_status/')
def getStatus():
status = ProxyManager().getNumber()
return status
@app.route('/get_balance')
def getBalance():
try:
form_data = {
'appkey': '0af3f486bb6988283af092cf24aace57',
'uid': '341358'
}
# rsp = requests.post('https://api.ipidea.net/api/open/flow_left', data=form_data)
rsp = requests.get('http://big_customer.willmam.com/index/index/get_my_balance?neek=112361&appkey=d2f6393b46afab108b038ab5b95f45d6')
rsp_str = rsp.content.decode()
rsp_json= json.loads(rsp_str)
# if rsp_json['ret_data']['flow_left'] > 0:
# rsp_json['flow_status'] = 'ok'
if rsp_json['data']['balance'] > 0:
rsp_json['balance_status'] = 'ok'
return rsp_json
except Exception as e:
return repr(e)
if platform.system() != "Windows":
import gunicorn.app.base
from six import iteritems
class StandaloneApplication(gunicorn.app.base.BaseApplication):
def __init__(self, app, options=None):
self.options = options or {}
self.application = app
super(StandaloneApplication, self).__init__()
def load_config(self):
_config = dict([(key, value) for key, value in iteritems(self.options)
if key in self.cfg.settings and value is not None])
for key, value in iteritems(_config):
self.cfg.set(key.lower(), value)
def load(self):
return self.application
def runFlask():
app.run(host=config.host_ip, port=config.host_port)
def runFlaskWithGunicorn():
_options = {
'bind': '%s:%s' % (config.host_ip, config.host_port),
'workers': 4,
'accesslog': '-', # log to stdout
'access_log_format': '%(h)s %(l)s %(t)s "%(r)s" %(s)s "%(a)s"'
}
StandaloneApplication(app, _options).run()
if __name__ == '__main__':
if platform.system() == "Windows":
runFlask()
else:
runFlaskWithGunicorn()