cscw_2021_sponsor/collect_users_without_spons...

225 lines
8.7 KiB
Python

# aim: get users without sponsor account, but the location is well defined for sponsor mechanism
# author: zxh
# date: 2021-06-01
import pymysql, yaml, math, datetime, json
from utils import *
import seaborn as sns
import pandas as pd
from matplotlib import pyplot as plt
import numpy as np
import threading, queue # 多线程搜集数据
f = open('config.yaml', 'r')
config = yaml.load(f.read(), Loader=yaml.BaseLoader)
conn_delft = connectMysqlDBSpecifyDB(config, "ghtorrent0619_tudelft", autocommit = True)
cur_delft = conn_delft.cursor(pymysql.cursors.DictCursor)
conn_sponsor = connectMysqlDBSpecifyDB(config, "github-sponsors", autocommit = True)
cur_sponsor = conn_sponsor.cursor(pymysql.cursors.DictCursor)
allowed_countries = [
"au",
"at",
"be",
"bg",
"ca",
"cy",
"cz",
"dk",
"ee",
"fi",
"fr",
"de",
"gr",
"hk",
"ie",
"it",
"jp",
"lv",
"lt",
"lu",
"mt",
"mx",
"nl",
"nz",
"no",
"pl",
"pt",
"ro",
"sg",
"sk",
"si",
"es",
"se",
"ch",
"uk",
"us"
]
allowed_countries = [c.lower() for c in allowed_countries]
result = []
cur_delft.execute("select u.id, u.login, u.country_code from users u where u.type='USR' and u.fake=0 and u.deleted=0 and u.country_code is not null")
users = cur_delft.fetchall()
print("finish reading all users")
cur_sponsor.execute("select user_id from middle_data_no_sponsor_account_users")
handled_users = cur_sponsor.fetchall()
handled_users = [user['user_id'] for user in handled_users]
print("finish reading handled users")
# find User(not organization) and have owned repositories with more than 1k stars and the location (country) information is well defined
class FindUser(threading.Thread):
def __init__(self, q):
threading.Thread.__init__(self)
self.q = q
self.conn_delft = connectMysqlDBSpecifyDB(config, "ghtorrent0619_tudelft", autocommit = True)
self.cur_delft = self.conn_delft.cursor(pymysql.cursors.DictCursor)
self.conn_sponsor = connectMysqlDBSpecifyDB(config, "github-sponsors", autocommit = True)
self.cur_sponsor = self.conn_sponsor.cursor(pymysql.cursors.DictCursor)
def run(self):
while(True):
try:
task = self.q.get(timeout=0)
print("loop how many threads left: %d" % (self.q.qsize()))
user_id = task['id']
login = task['login']
country_code = task['country_code']
print("handle user: %s" % (login))
if country_code in allowed_countries:
sum_star_num = 0
self.cur_delft.execute("select id from projects where owner_id=%s and deleted=0", (user_id,))
projects = self.cur_delft.fetchall()
if projects is not None:
for project in projects:
self.cur_delft.execute("select count(*) as star_num from watchers where repo_id=%s and user_id!=%s", (project['id'], user_id))
sum_star_num += self.cur_delft.fetchone()['star_num']
self.cur_sponsor.execute("insert into middle_data_no_sponsor_account_users (user_id, login, country_code, star_num) values (%s, %s, %s, %s)", (user_id, login, country_code, sum_star_num))
except queue.Empty:
sys.exit(-1)
return
self.q.task_done()
THREADNUM = 50
tasks = queue.Queue()
for user in users:
if user['id'] in handled_users:
continue # already handled
tasks.put(user)
for _ in range(THREADNUM):
t = FindUser(tasks)
t.start()
tasks.join()
print("finish")
# for user in users:
# if user['country_code'].lower() not in allowed_countries:
# continue
# else:
# result.append({
# "id": user['id'],
# "login": user['login'],
# "country_code": user['country_code'],
# "star_num": user['star_num']
# })
# # find whether users have participants in sponsor mechanism
# for user in result:
# # 1. whether exists in github_sponsor_listing table
# cur_sponsor.execute("select * from github_sponsor_listing where login=%s", (user['login'],))
# if cur_sponsor.fetchone() is not None:
# continue # already had sponsor account
# else:
# pass
# class whetherUserSponsorMechanism(threading.Thread):
# def __init__(self, q, threadNo):
# threading.Thread.__init__(self)
# self.q = q
# self.threadNo = threadNo
# self.thread_interval_count = 0
# def run(self):
# global mutex, github_tokens, sleep_gap_token, sleep_time_tokens, list_users_tasks_table_name
# while not self.q.empty():
# work = self.q.get(timeout=0)
# logging.info("Thread No: %s (%s), the number of work in queue: %s" % (self.threadNo, self.thread_interval_count, str(self.q.qsize())))
# ownername = work[0]
# reponame = work[1]
# # get a suitable token and combine header
# github_token = base.get_token(github_tokens, sleep_time_tokens, sleep_gap_token)
# headers = {
# 'Authorization': 'Bearer ' + github_token,
# 'Content-Type': 'application/json'
# }
# results = []
# has_next = True
# end_cursor = ""
# while has_next:
# values = {"query": query % (ownername, reponame, end_cursor), "variables": {}}
# try:
# # request data and parse response
# response = requests.post(url=url, headers = headers, json=values, timeout = 40)
# response.encoding = 'utf-8'
# if response.status_code != 200:
# logging.error('threadNo %s: status code %s, url: %s' % (self.threadNo, response.status_code, url))
# mutex.acquire()
# sleep_time_tokens[github_token] = time.time() # set sleep time for that token
# mutex.release()
# continue
# response_json = response.json()
# if "errors" in response_json:
# logging.error('threadNo %s: status code %s, url: %s, errors: %s' % (self.threadNo, response.status_code, url, json.dumps(response_json)))
# if response_json["errors"][0]["type"] == "RATE_LIMITED":
# mutex.acquire()
# sleep_time_tokens[github_token] = time.time() # set sleep time for that token
# mutex.release()
# continue
# elif response_json["errors"][0]["type"] == "NOT_FOUND":
# with open('sponsor_create_repo_star_results/' + ownername + "_-_" + reponame, "w") as f:
# f.write("not found")
# else:
# logging.error("unknown error, don't handle it!!!")
# mutex.acquire()
# sleep_time_tokens[github_token] = time.time() # set sleep time for that token
# mutex.release()
# continue
# else:
# results.append(response_json)
# # whether there exists next page
# if response_json["data"]["repository"]["stargazers"]["pageInfo"]["hasNextPage"] == False:
# with open('sponsor_create_repo_star_results/' + ownername + "_-_" + reponame, "w") as f:
# json.dump(results, f)
# else:
# end_cursor = ', after:"' + response_json["data"]["repository"]["stargazers"]["pageInfo"]["endCursor"] + '"'
# continue
# # update task queue
# self.q.task_done()
# self.thread_interval_count += 1
# break
# except Exception as e:
# logging.error('threadNo %s: error - %s' % (self.threadNo, str(e)))
# traceback.print_exc()
# mutex.acquire()
# sleep_time_tokens[github_token] = time.time()
# mutex.release()
# time.sleep(3)
# input results into database