PySpark 實作推薦算法 SVD++ 筆記
紀錄PySpark實作SVD++過程中值得紀錄的事。
SVD++公式
計算 loss
dot = 0 # <q_i, (p_u + sum_{j in Iu} y_j / sqrt{Iu}>
for f in range(self.n_factors):
dot += qi[i, f] * (pu[u, f] + u_impl_fdb[f])
err = r - (global_mean + bu[u] + bi[i] + dot)
SGD更新參數
# update biases
bu[u] += lr_bu * (err - reg_bu * bu[u])
bi[i] += lr_bi * (err - reg_bi * bi[i])
# update factors
for f in range(self.n_factors):
puf = pu[u, f]
qif = qi[i, f]
pu[u, f] += lr_pu * (err * qif - reg_pu * puf)
qi[i, f] += lr_qi * (err * (puf + u_impl_fdb[f]) - reg_qi * qif)
for j in Iu:
yj[j, f] += lr_yj * (err * qif / sqrt_Iu - reg_yj * yj[j, f])
參考案例
- Yelp如何用Pyspark處理大矩陣相乘 https://engineeringblog.yelp.com/2018/05/scaling-collaborative-filtering-with-pyspark.html
- 參考PySpark如何實做Factorization Machine https://github.com/blebreton/spark-FM-parallelSGD/blob/master/fm/fm_parallel_sgd.py
- 參考Scala如何實做Factorization Machine https://github.com/zhengruifeng/spark-libFM/blob/master/src/main/scala/org/apache/spark/mllib/regression/FMWithSGD.scala
- 參考Python如何實做SVDpp https://github.com/NicolasHug/Surprise/blob/master/surprise/prediction_algorithms/matrix_factorization.pyx#L283
- 討論 Parameter Server 的思路 https://kknews.cc/zh-tw/code/y5aejon.html
算法筆記
用broadcast的限制
- 花費時間在做pickle serialization
- 物件大小上限4GB
- 被heap大小限制 <- 還沒研究原因
將包含numpy.ndarray的RDD轉換為DataFrame
目的:參數更新仍然由RDD進行,但是分批儲存進disk打算改用dataframe處理
from pyspark.mllib.random import RandomRDDs
from pyspark.ml.linalg import DenseVector
rdd1 = RandomRDDs.normalVectorRDD(sc, 100, 200, numPartitions=None, seed=9999)
rdd2 = rdd1.zipWithIndex()
sdf = rdd2.map(lambda x: (DenseVector(x[0]), x[1])).toDF()
RowMatrix 轉換成 Spark DataFrame
sdf = rmat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.map(lambda x: (x, )).toDF()
sdf.select(['_1']).rdd.map(lambda x: (numpy.argsort(-(x['_1'].toArray()))).tolist()).collect() # 排序每一列的array index
將矩陣分批寫入硬碟
目的:參數更新結束後,將資料批次存入硬碟
for threshold in range(10):
sdf.filter(sdf['_2'] <= threshold).write.parquet(f"file:///home/tom/tmp/test_{threshold}.parquet")
使用glom
必須注意RDD的partition數量
rdd = sc.parallelize(4, 3, numSlices=2)
rdd = RandomRDDs.normalVectorRDD(sc, 4, 3, numPartitions=2)
如何建立BlockMatrix並且每塊Block用indexRowMatrix表示
# How?
把 list of lists of tuples 轉換成 list of tuples
目的: 可將各個executor計算好的 list of (item, user, rank) tuples 轉換成只用一個list包住,接著只要用dataframe包住可以直接轉換為dataframe
rdd.collect()
> [[(0, 1, 2), (2, 3 4)], [(1, 2, 5), (2, 3, 3)]]
rdd.flatMap(lambda x: x).collect()
> [(0, 1, 2), (2, 3 4), (1, 2, 5), (2, 3, 3)]
避免Broadcast造成OutOfMemory
參考:
https://stackoverflow.com/questions/38623885/spark-out-of-memory-when-broadcasting-objects
http://spark.apache.org/docs/latest/configuration.html#application-properties
Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.