Skip to content

PySpark 實作推薦算法 SVD++ 筆記

紀錄PySpark實作SVD++過程中值得紀錄的事。

SVD++公式

計算 loss

dot = 0 # <q_i, (p_u + sum_{j in Iu} y_j / sqrt{Iu}>
for f in range(self.n_factors):
dot += qi[i, f] * (pu[u, f] + u_impl_fdb[f])
err = r - (global_mean + bu[u] + bi[i] + dot)

SGD更新參數

# update biases
bu[u] += lr_bu * (err - reg_bu * bu[u])
bi[i] += lr_bi * (err - reg_bi * bi[i])

# update factors
for f in range(self.n_factors):
    puf = pu[u, f]
    qif = qi[i, f]
    pu[u, f] += lr_pu * (err * qif - reg_pu * puf)
    qi[i, f] += lr_qi * (err * (puf + u_impl_fdb[f]) - reg_qi * qif)
for j in Iu:
    yj[j, f] += lr_yj * (err * qif / sqrt_Iu - reg_yj * yj[j, f])

參考案例

算法筆記

用broadcast的限制

  1. 花費時間在做pickle serialization
  2. 物件大小上限4GB
  3. 被heap大小限制 <- 還沒研究原因

將包含numpy.ndarray的RDD轉換為DataFrame

目的:參數更新仍然由RDD進行,但是分批儲存進disk打算改用dataframe處理

from pyspark.mllib.random import RandomRDDs
from pyspark.ml.linalg import DenseVector
rdd1 = RandomRDDs.normalVectorRDD(sc, 100, 200, numPartitions=None, seed=9999)
rdd2 = rdd1.zipWithIndex()
sdf = rdd2.map(lambda x: (DenseVector(x[0]), x[1])).toDF()

RowMatrix 轉換成 Spark DataFrame

sdf = rmat.multiply(DenseMatrix(2, 2, [0, 2, 1, 3])).rows.map(lambda x: (x, )).toDF()
sdf.select(['_1']).rdd.map(lambda x: (numpy.argsort(-(x['_1'].toArray()))).tolist()).collect()  # 排序每一列的array index

將矩陣分批寫入硬碟

目的:參數更新結束後,將資料批次存入硬碟

for threshold in range(10):
    sdf.filter(sdf['_2'] <= threshold).write.parquet(f"file:///home/tom/tmp/test_{threshold}.parquet")

使用glom必須注意RDD的partition數量

rdd = sc.parallelize(4, 3, numSlices=2)
rdd = RandomRDDs.normalVectorRDD(sc, 4, 3, numPartitions=2)

如何建立BlockMatrix並且每塊Block用indexRowMatrix表示

# How?

把 list of lists of tuples 轉換成 list of tuples

目的: 可將各個executor計算好的 list of (item, user, rank) tuples 轉換成只用一個list包住,接著只要用dataframe包住可以直接轉換為dataframe

rdd.collect()
> [[(0, 1, 2), (2, 3 4)], [(1, 2, 5), (2, 3, 3)]]
rdd.flatMap(lambda x: x).collect()
> [(0, 1, 2), (2, 3 4), (1, 2, 5), (2, 3, 3)]

避免Broadcast造成OutOfMemory

參考:

https://stackoverflow.com/questions/38623885/spark-out-of-memory-when-broadcasting-objects

http://spark.apache.org/docs/latest/configuration.html#application-properties

Note: In client mode, this config must not be set through the SparkConf directly in your application, because the driver JVM has already started at that point. Instead, please set this through the --driver-memory command line option or in your default properties file.