# create the context sc = pyspark.SparkContext(conf=conf) spark = SparkSession.builder.getOrCreate()

训练集和测试集

training = spark.read.option("sep", "\t").csv("MovieLens.training", header=False, schema=schema_ratings)
test = spark.read.option("sep", "\t").csv("MovieLens.test", header=False, schema=schema_ratings)
items = spark.read.option("sep", "|").csv("MovieLens.item", header=False, schema=schema_items)

training

training.printSchema()
 |-- user_id: integer (nullable = true)
 |-- item_id: integer (nullable = true)
 |-- rating: integer (nullable = true)
 |-- timestamp: integer (nullable = true)

itemsid和对应的电影名:

+-------+--------------------+
|item_id|               movie|
+-------+--------------------+
|      1|    Toy Story (1995)|
|      2|    GoldenEye (1995)|
|      3|   Four Rooms (1995)|
|      4|   Get Shorty (1995)|
|      5|      Copycat (1995)|
|      6|Shanghai Triad (Y...|
|      7|Twelve Monkeys (1...|
|      8|         Babe (1995)|
|      9|Dead Man Walking ...|
|     10|  Richard III (1995)|
|     11|Seven (Se7en) (1995)|
|     12|Usual Suspects, T...|
|     13|Mighty Aphrodite ...|
|     14|  Postino, Il (1994)|
|     15|Mr. Holland's Opu...|
|     16|French Twist (Gaz...|
|     17|From Dusk Till Da...|
|     18|White Balloon, Th...|
|     19|Antonia's Line (1...|
|     20|Angels and Insect...|
+-------+--------------------+

使用ALS算法进行训练

from pyspark.ml.recommendation import ALS
als = ALS(rank=10, maxIter=10, regParam=0.1, userCol='user_id', itemCol='item_id', coldStartStrategy='drop')
models = als.fit(training)

使用RegressionEvaluator进行评估

from pyspark.ml.evaluation import RegressionEvaluator
predictions = models.transform(test)
evaluator = RegressionEvaluator(metricName='rmse', labelCol='rating', predictionCol='prediction')
rmse = evaluator.evaluate(predictions)

top-k推荐

得到top-k的movid_id和对应的rating
top1 = models.recommendForAllUsers(1)

筛选出movie_id
recommend_item = top1.withColumn('movie_id', top1.recommendations.item_id[0])

得到被推荐最多次的电影id
recommend_most = recommend_item.groupby(
    'movie_id'
    ).agg(
        count('*').alias('counts')
    ).sort(
        desc('counts')

通过和items进行join得到电影名