Mahout in Action 5

介绍

Mahout使用了Taste来提供协同过滤算法的实现,他是一个基于Java实现的可扩展的,高效的推荐引擎.Taste既实现了最基本的基于用户的和基于内容的推荐算法,同时也提供了扩展接口,使用户可以方便的定义和实现自己的推荐算法.同时,Taste不仅仅只适用于Java应用程序,它可以作为内部服务器的一个组件以HTTP和Web Service的形式向外界提供推荐逻辑.Taste的设计使它能满足企业对推荐引擎在性能,灵活性和可扩展性等方面的要求.

Taste主要包括以下几个接口:

  1. DataModel: 用户偏好数据的抽象接口,它的具体实现支持从任意类型的数据源抽取用户偏好信息.Taste默认提供JDBCDataModel和FileDataModel,分别支持从数据库文件中读取用户的偏好信息.
  2. UserSimilarity与ItemSimilarity: UserSimilarity用于定义两个用户之间的相似度,它是基于协同过滤的推荐引擎的核心部分.ItemSimilarity与之类似,用于计算物品之间的相似度.
  3. UserNeighborhood: 用于基于用户相似度的推荐方法中,推荐的内容是基于找到与当前用户喜好相似的邻域用户的方式产生的.定义了确定邻域的方法.
  4. Recommender: 是推荐引擎的抽象接口,Taste中的核心组件,程序中,它提供一个DataModel,可以计算出对不同用户的推荐内容.实际应用中,主要使用它的实现类 GenericUserBasedRecommender或GenericItemBasedRecommender,分别实现基于用户相似度的推荐引擎或基于物品相似度的推荐引擎.
  5. RecommenderEvaluator: 评分器.
  6. RecommenderIRStatsEvaluator: 搜集推荐性能相关的指标,准确率,召回率等.

目前,Mahout为DataModel提供了一下几种实现:

  • org.apache.mahout.cf.taste.impl.model.GenericDataModel
  • org.apache.mahout.cf.taste.impl.model.GenericBooleanPrefDataModel
  • org.apache.mahout.cf.taste.impl.model.PlusAnonymousUserDataModel
  • org.apache.mahout.cf.taste.impl.model.file.FileDataModel
  • org.apache.mahout.cf.taste.impl.model.hbase.HBaseDataModel
  • org.apache.mahout.cf.taste.impl.model.cassandra.CassandraDataModel
  • org.apache.mahout.cf.taste.impl.model.mongodb.MongoDBDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.SQL92JDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.MySQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.PostgreSQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.GenericJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.SQL92BooleanPrefJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.MySQLBooleanPrefJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.PostgreBooleanPrefSQLJDBCDataModel
  • org.apache.mahout.cf.taste.impl.model.jdbc.ReloadFromJDBCDataModel

从类名上就可以大概猜出来每个DataModel的用途.

UserSimilarity 与 ItemSimilarity 相似度实现由以下几种:

  1. CityBlockSimilarity: 基于Manhattan距离相似度
  2. EuclideanDistanceSimilarity: 基于欧氏距离相似度
  3. LogLikelihoodSimilarity: 基于对数似然比的相似度
  4. PearsonCorrelationSimilarity: 基于皮尔逊相关系数计算相似度
  5. SpearmanCorrelationSimilarity: 基于皮尔斯曼相关系数相似度
  6. TanimotoCoefficientSimilarity: 基于股本系数计算相似度
  7. UncenteredCosineSimilarity: 基于余弦值的相似度

UserNeighborhood的主要实现由两种:

  1. NearestNUserNeighborhood: 基于固定数量的用户邻域计算
  2. ThresholdUserNeighborhood: 基于阈值的用户邻域计算

Recommender分为以下几种实现:

  1. GenericUserBasedRecommender: 基于用户的推荐引擎
  2. GenericBooleanPrefUserBasedRecommender: 基于用户的无偏好值的推荐引擎
  3. GenericItemBasedRecommender: 基于物品的推荐引擎
  4. GenericBooleanPrefItemBasedRecommender: 基于物品的无偏好值的推荐引擎

RecommenderEvaluator有以下几种实现:

  1. AverageAbsoluteDifferenceRecommenderEvaluator: 计算平均差值
  2. RMSRecommenderEvaluator: 计算均方根差,实现类为: GenericRecommenderIRStatsEvaluator

使用依赖

<dependencies>
    <dependency>
        <groupId>org.apache.mahout</groupId>
        <artifactId>mahout-core</artifactId>
        <version>0.9</version>
    </dependency>
    <dependency>
        <groupId>org.apache.mahout</groupId>
        <artifactId>mahout-integration</artifactId>
        <version>0.11.2</version>
    </dependency>
</dependencies>

在Spark中使用

在Spark中运行推荐程序时需要将Mahout相关的jar添加到Sparl的classpath中,修改/etc/spark/conf/spark-env.sh,添加下面代码:

SPARK\_DIST\_CLASSPATH="$SPARK\_DIST\_CLASSPATH:/usr/lib/mahout/lib/\*"
SPARK\_DIST\_CLASSPATH="$SPARK\_DIST\_CLASSPATH:/usr/lib/mahout/\*"

然后,以本地模式在spark-shell中运行下面代码进行交互测试:

//注意:这里是本地目录
val model = new FileDataModel(new File("intro.csv"))

val evaluator = new RMSRecommenderEvaluator()
val recommenderBuilder = new RecommenderBuilder {
  override def buildRecommender(dataModel: DataModel): Recommender = {
    val similarity = new LogLikelihoodSimilarity(dataModel)
    new GenericItemBasedRecommender(dataModel, similarity)
  }
}

val score = evaluator.evaluate(recommenderBuilder, null, model, 0.95, 0.05)
println(s"Score=$score")

val recommender=recommenderBuilder.buildRecommender(model)
val users=trainingRatings.map(\_.user).distinct().take(20)

import scala.collection.JavaConversions.\_

val result=users.par.map{user=\>
  user+","+recommender.recommend(user,40).map(\_.getItemID).mkString(",")
}

https://github.com/sujitpal/mia-scala-examples上面有一个评估基于物品或是用户的各种相似度下的评分的类,叫做 RecommenderEvaluator,供学习参考.

分布式运行

Mahout提供了 org.apache.mahout.cf.taste.hadoop.item.RecommenderJob 类以MapReduce的方式来实现基于物品的协同过滤,查看该类的使用说明:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob
15/06/10 16:19:34 ERROR common.AbstractJob: Missing required option --similarityClassname
Missing required option --similarityClassname
Usage:
[--input \<input\> --output \<output\> --numRecommendations \<numRecommendations\>
--usersFile \<usersFile\> --itemsFile \<itemsFile\> --filterFile \<filterFile\>
--booleanData \<booleanData\> --maxPrefsPerUser \<maxPrefsPerUser\>
--minPrefsPerUser \<minPrefsPerUser\> --maxSimilaritiesPerItem
\<maxSimilaritiesPerItem\> --maxPrefsInItemSimilarity \<maxPrefsInItemSimilarity\>
--similarityClassname \<similarityClassname\> --threshold \<threshold\>
--outputPathForSimilarityMatrix \<outputPathForSimilarityMatrix\> --randomSeed
\<randomSeed\> --sequencefileOutput --help --tempDir \<tempDir\> --startPhase
\<startPhase\> --endPhase \<endPhase\>]
--similarityClassname (-s) similarityClassname Name of distributed
similarity measures class to
instantiate, alternatively
use one of the predefined
similarities
([SIMILARITY_COOCCURRENCE,
SIMILARITY_LOGLIKELIHOOD,
SIMILARITY_TANIMOTO_COEFFICIEN
T, SIMILARITY_CITY_BLOCK,
SIMILARITY_COSINE,
SIMILARITY_PEARSON_CORRELATION
,
SIMILARITY_EUCLIDEAN_DISTANCE]
)

可见,该类可以接收的命令行参数如下:

  • --input(path) : 存储用户偏好数据的目录,该目录下可以包含一个或多个存储用户偏好数据的文本文件;
  • --output(path) : 结算结果的输出目录
  • --numRecommendations (integer) : 为每个用户推荐的item数量,默认为10
  • --usersFile (path) : 指定一个包含了一个或多个存储userID的文件路径,仅为该路径下所有文件包含的userID做推荐计算 (该选项可选)
  • --itemsFile (path) : 指定一个包含了一个或多个存储itemID的文件路径,仅为该路径下所有文件包含的itemID做推荐计算 (该选项可选)
  • --filterFile (path) : 指定一个路径,该路径下的文件包含了 [userID,itemID] 值对,userID和itemID用逗号分隔。计算结果将不会为user推荐 [userID,itemID] 值对中包含的item (该选项可选)
  • --booleanData (boolean) : 如果输入数据不包含偏好数值,则将该参数设置为true,默认为false
  • --maxPrefsPerUser (integer) : 在最后计算推荐结果的阶段,针对每一个user使用的偏好数据的最大数量,默认为10
  • --minPrefsPerUser (integer) : 在相似度计算中,忽略所有偏好数据量少于该值的用户,默认为1
  • --maxSimilaritiesPerItem (integer) : 针对每个item的相似度最大值,默认为100
  • --maxPrefsPerUserInItemSimilarity (integer) : 在item相似度计算阶段,针对每个用户考虑的偏好数据最大数量,默认为1000
  • --similarityClassname (classname) : 向量相似度计算类
  • outputPathForSimilarityMatrix :SimilarityMatrix输出目录
  • --randomSeed :随机种子 – sequencefileOutput :序列文件输出路径
  • --tempDir (path) : 存储临时文件的目录,默认为当前用户的home目录下的temp目录
  • --startPhase
  • --endPhase
  • --threshold (double) : 忽略相似度低于该阀值的item对

一个例子如下,使用SIMILARITY_LOGLIKELIHOOD相似度推荐物品:

1
$ hadoop jar /usr/lib/mahout/mahout-examples-0.9-cdh5.4.0-job.jar org.apache.mahout.cf.taste.hadoop.item.RecommenderJob --input /tmp/mahout/part-00000 --output /tmp/mahout-out  -s SIMILARITY_LOGLIKELIHOOD

上面命令运行完成之后,会在当前用户的hdfs主目录生成temp目录,该目录可由 --tempDir (path) 参数设置:

1
2
3
4
5
6
7
8
9
10
11
12
$ hadoop fs -ls temp
Found 10 items
-rw-r--r-- 3 root hadoop 7 2015-06-10 14:42 temp/maxValues.bin
-rw-r--r-- 3 root hadoop 5522717 2015-06-10 14:42 temp/norms.bin
drwxr-xr-x - root hadoop 0 2015-06-10 14:41 temp/notUsed
-rw-r--r-- 3 root hadoop 7 2015-06-10 14:42 temp/numNonZeroEntries.bin
-rw-r--r-- 3 root hadoop 3452222 2015-06-10 14:41 temp/observationsPerColumn.bin
drwxr-xr-x - root hadoop 0 2015-06-10 14:47 temp/pairwiseSimilarity
drwxr-xr-x - root hadoop 0 2015-06-10 14:52 temp/partialMultiply
drwxr-xr-x - root hadoop 0 2015-06-10 14:39 temp/preparePreferenceMatrix
drwxr-xr-x - root hadoop 0 2015-06-10 14:50 temp/similarityMatrix
drwxr-xr-x - root hadoop 0 2015-06-10 14:42 temp/weights

观察yarn的管理界面,该命令会生成9个任务,任务名称依次是:

  • PreparePreferenceMatrixJob-ItemIDIndexMapper-Reducer
  • PreparePreferenceMatrixJob-ToItemPrefsMapper-Reducer
  • PreparePreferenceMatrixJob-ToItemVectorsMapper-Reducer
  • RowSimilarityJob-CountObservationsMapper-Reducer
  • RowSimilarityJob-VectorNormMapper-Reducer
  • RowSimilarityJob-CooccurrencesMapper-Reducer
  • RowSimilarityJob-UnsymmetrifyMapper-Reducer
  • partialMultiply
  • RecommenderJob-PartialMultiplyMapper-Reducer

从任务名称,大概可以知道每个任务在做什么,如果你的输入参数不一样,生成的任务数可能不一样,这个需要测试一下才能确认。

在hdfs上查看输出的结果:

1
2
843 [10709679:4.8334665,8389878:4.833426,9133835:4.7503786,10366169:4.7503185,9007487:4.750272,8149253:4.7501993,10366165:4.750115,9780049:4.750108,8581254:4.750071,10456307:4.7500467]
6253 [10117445:3.0375953,10340299:3.0340924,8321090:3.0340924,10086615:3.032164,10436801:3.0187714,9668385:3.0141575,8502110:3.013954,10476325:3.0074399,10318667:3.0004222,8320987:3.0003839]

使用Java API方式执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
StringBuilder sb = new StringBuilder();
sb.append("--input ").append(inPath);
sb.append(" --output ").append(outPath);
sb.append(" --tempDir ").append(tmpPath);
sb.append(" --booleanData true");
sb.append(" --similarityClassname
org.apache.mahout.math.hadoop.similarity.

cooccurrence.measures.EuclideanDistanceSimilarity");
args = sb.toString().split(" ");


JobConf jobConf = new JobConf(conf);
jobConf.setJobName("MahoutTest");

RecommenderJob job = new RecommenderJob();
job.setConf(conf);
job.run(args);

在Scala或者Spark中,可以以Java API或者命令方式运行,最后还可以通过Spark来处理推荐的结果,例如:过滤、去重、补足数据,这部分内容不做介绍。

原文 http://blog.javachen.com/2015/06/10/collaborative-filtering-using-mahout.html