Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions GBDTLRRecModel/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>org.example</groupId>
<artifactId>GBDTLRRecModel</artifactId>
<version>1.0-SNAPSHOT</version>

<properties>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<spark.version>2.3.2</spark.version>
<scala.version>2.11</scala.version>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-mllib -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>

<dependency>
<groupId>ml.combust.mleap</groupId>
<artifactId>mleap-runtime_2.11</artifactId>
<version>0.12.0</version>
</dependency>

<dependency>
<groupId>ml.combust.mleap</groupId>
<artifactId>mleap-spark_2.11</artifactId>
<version>0.12.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.jsuereth/scala-arm -->
<dependency>
<groupId>com.jsuereth</groupId>
<artifactId>scala-arm_2.11</artifactId>
<version>2.0</version>
</dependency>


<dependency>
<groupId>org.jpmml</groupId>
<artifactId>jpmml-sparkml</artifactId>
<version>1.4.6</version>
</dependency>

<dependency>
<groupId>org.jpmml</groupId>
<artifactId>pmml-evaluator</artifactId>
<version>1.4.3</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<artifactId>httpclient</artifactId>
<groupId>org.apache.httpcomponents</groupId>
</exclusion>
</exclusions>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
</plugins>
</build>

</project>
8 changes: 8 additions & 0 deletions GBDTLRRecModel/src/main/resources/log4j.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#log4j.rootLogger=WARN, stdout, file
log4j.rootLogger=WARN,console
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=ERROR
log4j.appender.console.ImmediateFlush=true
log4j.appender.console.Target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%n%-d{yyyy-MM-dd HH:mm:ss}%n[%p]-[Thread: %t]-[%C.%M()]: %m%n
188 changes: 188 additions & 0 deletions GBDTLRRecModel/src/main/scala/GBDTLR.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg
import org.apache.spark.mllib.linalg.{DenseVector, SparseVector, Vector}
import org.apache.spark.sql.SparkSession
//import org.apache.spark.ml.linalg.{DenseVector, SparseVector}
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.tree.configuration.{BoostingStrategy, FeatureType, Strategy}
import org.apache.spark.mllib.tree.GradientBoostedTrees
import org.apache.spark.mllib.tree.model.{GradientBoostedTreesModel, Node}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
import org.apache.spark.sql.DataFrame

/**
* Author:
lixiang, 183570397@qq.com
GBDT and LR:
reference: Practical Lessons from Predicting Clicks on Ads at Facebook
*/
object GBDTLR extends Serializable {


def fit(train: RDD[LabeledPoint]) ={
val numTrees = 40
val boostingStrategy = BoostingStrategy.defaultParams("Classification")
boostingStrategy.setNumIterations(numTrees)
val treeStratery = Strategy.defaultStrategy("Classification")
treeStratery.setMaxDepth(8)
treeStratery.setNumClasses(2)
// treeStratery.setCategoricalFeaturesInfo(Map[Int, Int]())
boostingStrategy.setTreeStrategy(treeStratery)
var gbdtModel = GradientBoostedTrees.train(train, boostingStrategy)
// gbdtModel.save(sc, "/lx/model_gbdt")
println("treeWeights>>>"+gbdtModel.treeWeights.toList.toString())

val treeLeafArray = new Array[Array[Int]](numTrees)
for (i <- 0.until(numTrees)) {
treeLeafArray(i) = getLeafNodes(gbdtModel.trees(i).topNode)
}
for (i <- 0.until(numTrees)) {
val tree = gbdtModel.trees(i)
val topNode = tree.topNode
println("第i棵树>>"+i+">>topnodeid>>"+topNode.id+">>numNodes>>"+tree.numNodes+">>"+treeLeafArray(i).length+">>"+treeLeafArray(i).mkString(","))
println("叶子节点误差>>>", treeLeafArray(i).length, (gbdtModel.trees(i).numNodes + 1) / 2)
}
(gbdtModel, treeLeafArray)
}

def transform(features: Vector, gbdtModel: GradientBoostedTreesModel, treeLeafArray: Array[Array[Int]]) = {
var newFeature = new Array[Double](0)
val numTrees = gbdtModel.numTrees
for (i <- 0.until(numTrees)) {
val treePredict = predictModify(gbdtModel.trees(i).topNode, features)
//gbdt tree is binary tree
val treeArray = new Array[Double]((gbdtModel.trees(i).numNodes + 1) / 2)
treeArray(treeLeafArray(i).indexOf(treePredict)) = gbdtModel.treeWeights(i) //设置为树的权重//i
newFeature = newFeature ++ treeArray
}
import org.apache.spark.ml.linalg.DenseVector
new DenseVector(newFeature).toSparse
}

//get decision tree leaf's nodes
def getLeafNodes(node: Node): Array[Int] = {
var treeLeafNodes = new Array[Int](0)
if (node.isLeaf) {
treeLeafNodes = treeLeafNodes.:+(node.id)
} else {
treeLeafNodes = treeLeafNodes ++ getLeafNodes(node.leftNode.get)
treeLeafNodes = treeLeafNodes ++ getLeafNodes(node.rightNode.get)
}
treeLeafNodes
}

// predict decision tree leaf's node value
def predictModify(node: Node, features: Vector): Int = {
val split = node.split
if (node.isLeaf) {
node.id
} else {
if (split.get.featureType == FeatureType.Continuous) {
if (features(split.get.feature) <= split.get.threshold) {
// println("Continuous left node")
predictModify(node.leftNode.get, features)
} else {
// println("Continuous right node")
predictModify(node.rightNode.get, features)
}
} else {
if (split.get.categories.contains(features(split.get.feature))) {
// println("Categorical left node")
predictModify(node.leftNode.get, features)
} else {
// println("Categorical right node")
predictModify(node.rightNode.get, features)
}
}
}
}

def evaluate(predictions: DataFrame): Unit ={
val binaryClassificationEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val auc = binaryClassificationEvaluator.evaluate(predictions)
println("auc>>>", auc)

}

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().master("local[*]").appName("GbdtAndLr").getOrCreate()
val sc = spark.sparkContext
sc.setLogLevel("WARN")
Logger.getLogger("app").setLevel(Level.WARN)
import spark.implicits._

val file_path = "file://" +
"/Users/zhewang/Workspace/SparrowRecSys/src/main/resources"
val ratingResourcesPath = file_path + "/webroot/sampledata/trainingSamples.csv"
val ratingSampes = spark.read.format("csv").option("header", true).load(ratingResourcesPath)
.withColumn("userIdInt", col("userId").cast(DoubleType))
.withColumn("movieIdInt", col("movieId").cast(DoubleType))
.withColumn("ratingFloat", col("rating").cast(DoubleType))
.withColumn("rating", col("rating").cast(DoubleType))
.withColumn("label", col("label").cast(DoubleType))

//printSchema
ratingSampes.printSchema()
println(ratingSampes.show(5))

// 简单版特征工程,可以加上userEmb, itemEmb相关特征
val features_names = List("movieAvgRating", "movieRatingStddev", "movieRatingCount", "userAvgRating", "userRatingStddev", "userRatingCount", "releaseYear")
val train = ratingSampes.map(x=>{
val label = x.getAs[Double]("label")
val features = features_names.map(name => {
x.getAs[String](name).toDouble
})
LabeledPoint(label, new DenseVector(features.toArray))
})

val tuple = fit(train.rdd)
val gbdtModel = tuple._1
val treeLeafArray = tuple._2
val gbdt_b = sc.broadcast(gbdtModel)
println("treeLeafArray: ", treeLeafArray.map(_.length).sum)

// 获取叶子结点
val trainForLr = train.map(p => {
val leafNodes = transform(p.features, gbdt_b.value, treeLeafArray)
org.apache.spark.ml.feature.LabeledPoint(label = p.label, features = new linalg.DenseVector(leafNodes.toArray).toSparse)
}).toDF()
println("leaf features")
trainForLr.show()

val lr = new LogisticRegression()
val model = lr.fit(trainForLr)
val predictions = model.transform(trainForLr)
predictions.show()

evaluate(predictions)
sc.stop()

println("done")
















}


}
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,10 @@ def processItemSequence(spark, rawSampleDataPath):
userSeq = ratingSamples \
.where(F.col("rating") >= 3.5) \
.groupBy("userId") \
.agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds')) \
.withColumn("movieIdStr", array_join(F.col("movieIds"), " "))
.agg(sortUdf(F.collect_list("movieId"), F.collect_list("timestamp")).alias('movieIds'))

# userSeq.select("userId", "movieIdStr").show(10, truncate = False)
return userSeq.select('movieIdStr').rdd.map(lambda x: x[0].split(' '))
return userSeq.rdd.map(lambda x: x[1])


def embeddingLSH(spark, movieEmbMap):
Expand Down Expand Up @@ -175,8 +175,9 @@ def generateUserEmb(spark, rawSampleDataPath, model, embLength, embOutputPath, s
schema = StructType(fields)
Vectors_df = spark.createDataFrame(Vectors_list, schema=schema)
ratingSamples = ratingSamples.join(Vectors_df, on='movieId', how='inner')
result = ratingSamples.select('userId', 'emb').rdd.map(lambda x: (x[0], x[1])) \
.reduceByKey(lambda a, b: [a[i] + b[i] for i in range(len(a))]).collect()
result = ratingSamples.select('userId', 'emb').rdd.map(lambda x: (x[0], (x[1], 1))) \
.reduceByKey(lambda a, b: ([a[0][i] + b[0][i] for i in range(len(a[0]))], a[1] + b[1])) \
.map(lambda x: (x[0], [v / x[1][1] for v in x[1][0]])).collect()
with open(embOutputPath, 'w') as f:
for row in result:
vectors = " ".join([str(emb) for emb in row[1]])
Expand Down