74 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			74 lines
		
	
	
		
			2.3 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
#!/usr/bin/env python3
 | 
						|
# panxy, a random proxy connector
 | 
						|
 | 
						|
from urllib import request, error 
 | 
						|
import socket
 | 
						|
import sys
 | 
						|
target = "http://theresno.cloud/ip"
 | 
						|
prx_api = "https://www.proxyscan.io/api/proxy?last_check=9800&limit=10&type=http" #"http://pubproxy.com/api/proxy?limit=10&http=true&format=txt"
 | 
						|
 | 
						|
def get_list_from_api():
 | 
						|
    import json
 | 
						|
    print('Requesting proxy list... ', end='')
 | 
						|
    result = request.urlopen(prx_api)
 | 
						|
    # json magic starts here
 | 
						|
    content = result.read().decode()
 | 
						|
    json_data = json.loads(content)
 | 
						|
    prx_list = ["".join(json_data[i]["Ip"] + ':' + str(json_data[i]["Port"])) for i in range(len(json_data))]
 | 
						|
    #= result.read().decode().split('\n')
 | 
						|
    print('got {} proxies.'.format(len(prx_list)))
 | 
						|
    return prx_list
 | 
						|
 | 
						|
def test_proxy(prx):
 | 
						|
    try:
 | 
						|
        from http.client import RemoteDisconnected
 | 
						|
    except ImportError:
 | 
						|
        from httplib import BadStatusLine as RemoteDisconnected
 | 
						|
    srv_string = "Testing proxy " + prx + "... "
 | 
						|
    print(srv_string, end='\r')
 | 
						|
    proxy_handler = request.ProxyHandler({'https': 'http://' + prx + '/', 'http': 'http://' + prx + '/' })
 | 
						|
    opener = request.build_opener(proxy_handler)
 | 
						|
    try:
 | 
						|
        result = opener.open(target, timeout=10)
 | 
						|
        content = result.read().decode()
 | 
						|
        print(srv_string + 'ok. \u2705')
 | 
						|
        return True
 | 
						|
    except error.HTTPError:
 | 
						|
        print(srv_string + 'fail. \u274c')
 | 
						|
        return False
 | 
						|
    except error.URLError:
 | 
						|
        print(srv_string + 'down! \u2935')
 | 
						|
        return False
 | 
						|
    except RemoteDisconnected:
 | 
						|
         print(srv_string + 'connection fail!')
 | 
						|
         return False
 | 
						|
    except socket.timeout:
 | 
						|
        print(srv_string + 'timeout! \u23f3')
 | 
						|
        return False
 | 
						|
       
 | 
						|
def get_proxies():
 | 
						|
 | 
						|
    proxy_list = get_list_from_api()
 | 
						|
    working_list = []
 | 
						|
    start_length = len(proxy_list)
 | 
						|
    for entry in proxy_list:
 | 
						|
        if test_proxy(entry):
 | 
						|
            working_list.append(entry)
 | 
						|
        else:
 | 
						|
            pass
 | 
						|
    print('{}/{} new proxies ready.'.format(len(working_list), start_length))
 | 
						|
    return working_list
 | 
						|
 | 
						|
def main():
 | 
						|
    proxies = []
 | 
						|
    while True:
 | 
						|
        proxies += get_proxies()
 | 
						|
        print('{} proxies total ready.'.format(len(proxies)))
 | 
						|
        choice = input('retry or continue? r|c ')
 | 
						|
        if choice == 'c':
 | 
						|
            break
 | 
						|
    print(proxies)
 | 
						|
 | 
						|
if __name__ == '__main__':
 | 
						|
  main()
 |