# -*- coding: utf-8 -*-
from numpy import *
import time
import psycopg2
import threading
# 协同过滤推荐算法主要分为:
# class CF:
# def __init__(self, movies, ratings, maxrating, k=5, n=10):
# self.movies = movies
# self.ratings = ratings
# self.maxrating = maxrating
# # 邻居个数
# self.k = k
# # 推荐个数
# self.n = n
# # 用户对电影的评分
# # 数据格式{'UserID:用户ID':[(MovieID:电影ID,Rating:用户对电影的评星)]}
# self.userDict = {}
# # 对某电影评分的用户
# # 数据格式:{'MovieID:电影ID',[UserID:用户ID]}
# # {'1',[1,2,3..],...}
# self.ItemUser = {}
# # 邻居的信息
# self.neighbors = []
# # 推荐列表
# self.recommandList = []
# self.cost = 0.0
# # 基于用户的推荐
# # 根据对电影的评分计算用户之间的相似度
# def recommendByUser(self, userId):
# self.formatRate()
# # 推荐个数 等于 本身评分电影个数,用户计算准确率
# self.n = len(self.userDict[userId])
# self.getNearestNeighbor(userId)
# self.getrecommandList(userId)
# self.getPrecision(userId)
# # 获取推荐列表
# def getrecommandList(self, userId):
# self.recommandList = []
# # 建立推荐字典
# recommandDict = {}
# for neighbor in self.neighbors:
# movies = self.userDict[neighbor[1]]
# for movie in movies:
# if(movie[0] in recommandDict):
# recommandDict[movie[0]] += neighbor[0]
# else:
# recommandDict[movie[0]] = neighbor[0]
# # 建立推荐列表
# for key in recommandDict:
# self.recommandList.append([recommandDict[key], key])
# self.recommandList.sort(reverse=True)
# self.recommandList = self.recommandList[:self.n]
# # 将ratings转换为userDict和ItemUser
# def formatRate(self):
# self.userDict = {}
# self.ItemUser = {}
# for i in self.ratings:
# # 评分最高为5 除以5 进行数据归一化
# temp = (i[1], float(i[2]) / self.maxrating)
# # 计算userDict {'1':[(1,5),(2,5)...],'2':[...]...}
# if(i[0] in self.userDict):
# self.userDict[i[0]].append(temp)
# else:
# self.userDict[i[0]] = [temp]
# # 计算ItemUser {'1',[1,2,3..],...}
# if(i[1] in self.ItemUser):
# self.ItemUser[i[1]].append(i[0])
# else:
# self.ItemUser[i[1]] = [i[0]]
# # 找到某用户的相邻用户
# def getNearestNeighbor(self, userId):
# neighbors = []
# self.neighbors = []
# # 获取userId评分的电影都有那些用户也评过分
# for i in self.userDict[userId]:
# for j in self.ItemUser[i[0]]:
# if(j != userId and j not in neighbors):
# neighbors.append(j)
# # 计算这些用户与userId的相似度并排序
# for i in neighbors:
# dist = self.getCost(userId, i)
# self.neighbors.append([dist, i])
# # 排序默认是升序,reverse=True表示降序
# self.neighbors.sort(reverse=True)
# self.neighbors = self.neighbors[:self.k]
# # 格式化userDict数据
# def formatuserDict(self, userId, l):
# user = {}
# for i in self.userDict[userId]:
# user[i[0]] = [i[1], 0]
# for j in self.userDict[l]:
# if(j[0] not in user):
# user[j[0]] = [0, j[1]]
# else:
# user[j[0]][1] = j[1]
# return user
# # 计算余弦距离
# def getCost(self, userId, l):
# # 获取用户userId和l评分电影的并集
# # {'电影ID':[userId的评分,l的评分]} 没有评分为0
# user = self.formatuserDict(userId, l)
# x = 0.0
# y = 0.0
# z = 0.0
# for k, v in user.items():
# x += float(v[0]) * float(v[0])
# y += float(v[1]) * float(v[1])
# z += float(v[0]) * float(v[1])
# if(z == 0.0):
# return 0
# return z / sqrt(x * y)
# # 推荐的准确率
# def getPrecision(self, userId):
# user = [i[0] for i in self.userDict[userId]]
# recommand = [i[1] for i in self.recommandList]
# count = 0.0
# if(len(user) >= len(recommand)):
# for i in recommand:
# if(i in user):
# count += 1.0
# self.cost = count / len(recommand)
# else:
# for i in user:
# if(i in recommand):
# count += 1.0
# self.cost = count / len(user)
class CFThread(threading.Thread):
def __init__(self, threadID, name, handleUs_i):
self.threadID = threadID
self.name = name
self.handleUs_i = handleUs_i
def run(self):
print("in the default run function")
# 获取数据
def readFile(filename):
# files = open(filename, "r", encoding="utf-8")
# 如果读取不成功试一下
files = open(filename, "r", encoding="iso-8859-15")
data = []
for line in files.readlines():
item = line.strip().split("::")
return data
def getRatings():
data = []
con = psycopg2.connect(host = "localhost", port="5432", database='oschina', user='postgres', password='111111')
cur = con.cursor()
cur.execute('SELECT uid,aid,times from "iTags"."reads" order by aid asc')
curs = cur.fetchall()
for item in curs:
temp = item
# print(temp)
except psycopg2.DatabaseError as e:
print('Error %s' % e)
return data
def getRatings():
data = []
con = psycopg2.connect(host="localhost", port="5432", database='oschina', user='postgres',
cur = con.cursor()
sql = 'SELECT uid,aid,times from "iTags"."reads"'
curs = cur.fetchall()
for item in curs:
temp = item
# print(temp)
data.append([temp[0], temp[1], temp[2]])
except psycopg2.DatabaseError as e:
print('Error %s' % e)
return data
# dic_new: is the new dict
# dic_old: is the previous dict
def changedUser(dic_new, dic_old):
time1 = time.time()
result = {}
for uid in dic_new.keys():
changedArticleSet = set()
if uid not in dic_old:
changedArticleSet = dic_new[uid]
a_t_map_new = dic_new[uid]
a_t_map = dic_old[uid]
for aid in a_t_map_new.keys():
if aid not in a_t_map or a_t_map_new[aid] != a_t_map[aid]:
if len(changedArticleSet) > 0:
result[uid] = changedArticleSet
time2 = time.time()
print("get changed user time: %.2f" % (time2 - time1))
print("changed user number: %d" % (len(result)))
return result
# get candidate users
# threshold is the minimum number of articles
def getCandidates(u_aSet_map, threshold):
result = set()
for user in u_aSet_map.keys():
aSet = u_aSet_map[user]
if len(aSet) < threshold:
return result
def insertRelations(result):
con = psycopg2.connect(host="localhost", port="5432", database='oschina', user='postgres', password='111111')
cur = con.cursor()
for (uid, value) in result.items():
for (uid2, relation) in value.items():
cur.execute('insert into "iTags"."user_relations" (uid, uid2, relation) values (%s, %s, %s)', (uid, uid2, relation))
# def getMovies():
# data = []
# try:
# con = psycopg2.connect(host="localhost", port="5432", database='oschina', user='postgres', password='123456')
# cur = con.cursor()
# cur.execute('SELECT id,title from "iTags"."articles"')
# curs = cur.fetchall()
# for item in curs:
# temp = item
# # print(temp)
# data.append([temp[0], temp[1]])
# except psycopg2.DatabaseError as e:
# print('Error %s' % e)
# return data
# getRatings()
class myThread(threading.Thread):
def __init__(self, threadID, name, handleUs_i):
self.threadID = threadID
self.name = name
self.handleUs_i = handleUs_i
def run(self):
# global relation_result
# global calculatedDenominatorMap
global count
global time0
con = psycopg2.connect(host="localhost", port="5432", database='oschina', user='postgres', password='111111')
cur = con.cursor()
print("线程" + str(self.name) + "开始,处理用户,共" + str(len(self.handleUs_i)) + "个")
for uid in self.handleUs_i:
print("开始处理用户:" + str(uid))
count += 1
time1 = time.time()
if count % 50 == 0:
print("处理了%d个项目,耗时:%.2f" % (count, time1-time0))
changedArticleSet = changed_user_dict[uid]
time2 = time.time()
# print("查看用户阅读情况是否有变化,时间为:%.2f" % (time2 - time1))
if len(changedArticleSet) > 0:
# print("user " + str(uid) + "'s article reading result has changed")
# relation_result.setdefault(uid, {})
time3 = time.time()
aSet1 = u_aSet_map_new[uid]
denominator1 = 0.0
# if uid in calculatedDenominatorMap:
# denominator1 = calculatedDenominatorMap[uid]
# else:
for a in aSet1:
times1 = u_a_t_map_new[uid][a]
denominator1 += times1 * times1
# calculatedDenominatorMap[uid] = denominator1
insert_sql_strings = [] # 一个用户提交一次
update_sql_strings = []
matchedUserSet = set()
for aid in changedArticleSet:
time4 = time.time()
uSet = a_uSet_map_new[aid] # 不计算所有用户关系,只计算和热门用户之间的关系
matchedUserSet = matchedUserSet | uSet
matchedUserSet = matchedUserSet & candidate_user_set
print("match user number: %d" % (len(uSet)))
for uid2 in matchedUserSet:
relation_result = 0.0
# relation_result.setdefault(uid2, {})
if uid == uid2:
# relation_result[uid][uid2] = 1.0
# relation_result[uid2][uid] = 1.0
relation_result = 1.0
# 计算uid和uid2之间的差值
aSet2 = u_aSet_map_new[uid2]
sameAs = aSet1 & aSet2
# 计算numerator
numerator = 0.0
for a in sameAs:
times1 = u_a_t_map_new[uid][a]
times2 = u_a_t_map_new[uid2][a]
numerator += times1 * times2
# 计算denominator
denominator2 = 0.0
# if uid2 in calculatedDenominatorMap:
# denominator2 = calculatedDenominatorMap[uid2]
# else:
for a in aSet2:
times2 = u_a_t_map_new[uid2][a]
denominator2 += times2 * times2
# calculatedDenominatorMap[uid2] = denominator2
# 计算相关结果
result = numerator / (math.sqrt(denominator1 * denominator2))
# relation_result[uid][uid2] = result
# relation_result[uid2][uid] = result
relation_result = result
# 判断结果是否存在
cur.execute('select relation from "iTags"."user_relations" where uid=%s and uid2=%s', (uid, uid2))
temps = cur.fetchall()
if len(temps) == 0:
insert_item = [uid, uid2, relation_result]
if relation_result != temps[0][0]:
update_item = [relation_result, uid, uid2]
if len(insert_sql_strings) > 0:
time8 = time.time()
sql = 'insert into "iTags"."user_relations" (uid, uid2, relation) values (%s, %s, %s)'
cur.executemany(sql, insert_sql_strings)
time9 = time.time()
# print("一批(%d个)数据库insert操作时间为:%.2f" % (len(insert_sql_strings), time9 - time8))
if len(update_sql_strings) > 0:
time10 = time.time()
sql = 'update "iTags"."user_relations" set relation=%s where uid=%s and uid2=%s'
cur.executemany(sql, update_sql_strings)
time11 = time.time()
# print("一批(%d个)数据库update操作时间为:%.2f" % (len(insert_sql_strings), time11 - time10))
print('线程' + self.name + '结束!')
# -------------------------开始-------------------------------
# relation_result = {}
u_a_t_map = {}
u_aSet_map = {}
changed_user_dict = {}
candidate_user_set = set()
count_cycle = 0
count = 0
time0 = time.time()
while True:
print("a new cycle")
count_cycle += 1
start_time = time.time()
u_a_t_map_new = {}
u_at_map_new = {}
u_aSet_map_new = {}
a_uSet_map_new = {}
# start_index = (count_cycle - 1) * 18901230
# end_index = start_index + 18901230
ratings = getRatings()
for rating in ratings:
uid = rating[0]
aid = rating[1]
times = rating[2]
u_a_t_map_new.setdefault(uid, {})
u_a_t_map_new[uid][aid] = times
u_aSet_map_new.setdefault(uid, set())
a_uSet_map_new.setdefault(aid, set())
# 找到所有变化的用户
changed_user_dict = changedUser(u_a_t_map_new, u_a_t_map)
# 找到所有待计算关联用户
candidate_user_set = getCandidates(u_a_t_map_new, 100)
print("number of candidates: %d" % len(candidate_user_set))
if len(changed_user_dict) <= 0:
continue # 表示没有变化的用户
# 线程中计算结果
print('这是主线程:', threading.current_thread().name)
threadNum = 8
averageNum = math.ceil(len(changed_user_dict) / threadNum) # 平均一个线程多少个
thread_list = []
changed_user_list = list(changed_user_dict.keys())
for i in range(threadNum):
start_num = i * averageNum
end_num = start_num + averageNum
handleUs_i = changed_user_list[start_num:end_num]
thread = myThread(i, str(i), handleUs_i)
# 等待所有线程完成
for t in thread_list:
u_a_t_map = u_a_t_map_new
u_aSet_map = u_aSet_map_new
print('主线程结束!', threading.current_thread().name)
print('一共用时:', time.time() - start_time)
# 写入数据库
# insertRelations(relation_result)
# print("cycle " + str(count_cycle) + " uses time(s): " + str(time.time() - start_time))
time.sleep(3) # sleep 3 seconds
# start = time.clock()
# movies = readFile("dataset/movies.dat")
# ratings = readFile("dataset/ratings.dat")
# ratings = getRatings()
# demo = CF(movies, ratings, 1309, k=20)
# demo.recommendByUser(1784225)
# print("推荐列表为:")
# end = time.clock()
# print("the time for reading reads table is: " + str((end - start)))
# demo.showTable()
# print("处理的数据为%d条" % (len(demo.ratings)))
# print("准确率: %.2f %%" % (demo.cost * 100))
# end = time.clock()
# print("耗费时间: %f s" % (end - start))