After the resounding success of the first article on recommender systems, Álvaro Santos is back with some further insight into creating a recommender system.

Coming soon: A follow-up Meetup in Madrid to go even further into this exciting topic. Stay tuned!

***

In the previous article of this series, we explained what a recommender system is, describing its main parts and providing some basic algorithms which are frequently used in these systems. We also explained how to code some functions to read JSON files and to map the data in MongoDB and ElasticSearch using Spark SQL and Spark connectors.

This second part will cover:

  • Generating our Collaborative Filtering model.
  • Pre-calculating product / user recommendations.
  • Launching a small REST server to interact with the recommender.
  • Querying the data store to retrieve content-based recommendations.
  • Mixing the different types of recommendations to create a hybrid recommender.

Collaborative Filtering Algorithm

In this recommender service example, we have chosen ALS or Alternating Least Squares as the algorithm for Collaborative Filtering. Although ALS is the only algorithm implemented by Spark for this matter, it has been broadly tested and shown to have a good performance. It is perfectly suited for this project example.

You can learn more about Alternating Least Squares at this link.

Recommender Trainer

In this section, we will code the program that will create our Collaborative Filtering model. It will pre-calculate all recommendations to ensure a faster service.

First of all, we should read all the reviews from MongoDB:

def reviewStructure(): List[StructField] = {
    List(
      StructField("_id", StringType),
      StructField("reviewId", StringType),
      StructField("userId", StringType),
      StructField("productId", StringType),
      StructField("title", StringType),
      StructField("overall", DoubleType),
      StructField("content", StringType),
      StructField("date", DateType)
    )
}

val mongoRDD = sqlContext.fromMongoDB(reviewsConfig.build(),
 Option(StructType(reviewStructure)))

mongoRDD.registerTempTable("reviews")

val ratingsDF = sqlContext.sql("select userId, productId, overall from reviews where userId is not null and productId is not null and overall is not null").cache

The data cannot however be used by the Spark API “as it comes” from the DB. We must transform our ratings’ Dataframe into a RDD of Spark ratings:

val ratingsRDD = ratingsDF.map { row =>
  Rating(row.getAs[String]("userId").hashCode,
         row.getAs[String]("productId").hashCode,
         row.getAs[Double]("overall").toFloat / MAX_RATING)}

Now it is time to create our ALS model:

val rank = 10
val numIterations = 10
val model = ALS.train(ratingsRDD, rank, numIterations, 0.01)

Once we have trained the model, the next step is to pre-calculate the recommendations. We should however, firstly create two lists with the products and users:

val usersRDD = ratingsDF.map(row => row.getAs[String]("userId"))
                        .map(userId => (userId.hashCode, userId))

val productsRDD = ratingsDF.map(row => row.getAs[String]("productId"))
                           .map(productId => (productId.hashCode, productId))
val productsMap: Broadcast[Map[Int, String]] = sc.broadcast(productsRDD.collect().toMap)

Then we need to calculate the user recommendations using the Spark API and save the data to MongoDB:

val userRecs: RDD[Row] = model.recommendProductsForUsers(maxRecs)
      .join(usersRDD).map { case (userIdInt, (ratings, userId)) =>
            Row(userId, ratings.map { rating =>
                Row(productsMap.value.get(rating.product).get,rating.rating)        
             }.toList)}

val userRecsConfig = ...

sqlContext.createDataFrame(userRecs, StructType(userRecsStructure))
          .saveToMongodb(userRecsConfig.build)

Finally, we have to pre-calculate the product recommendations. Spark does not provide a direct way of calculating the recommendations for products. We will therefore measure the similarity of the products using the cosine similarity:

def cosineSimilarity(vec1: DoubleMatrix, vec2: DoubleMatrix): Double = {
  vec1.dot(vec2) / (vec1.norm2() * vec2.norm2())
}

val productsRecs = productsMap.toList.map { case (idInt, id) =>
  val itemFactor = model.productFeatures.lookup(idInt).head
  val itemVector = new DoubleMatrix(itemFactor)

  val sims: RDD[(String, Double)] = model.productFeatures
                                         .map { case (currentIdInt, factor) =>
    val currentId = productsMap(currentIdInt)

    val factorVector = new DoubleMatrix(factor)
    val sim = cosineSimilarity(factorVector, itemVector)

    (currentId, sim)
  }

  val recs = sims.filter { case (currentId, sim) => currentId != id }
    .top(maxRecs)(Ordering.by[(String, Double), Double]
                 { case (currentId, sim) => sim })
    .map { case (currentId, sim) => Row(currentId, sim) }.toList

  Row(id, recs)
}
val productRecsRDD: RDD[Row] = sc.parallelize(productsRecs)

val productRecsConfig = ...

sqlContext.createDataFrame(productRecsRDD, StructType(productRecsStructure))
  .saveToMongodb(productRecsConfig.build)

Recommender Service

After saving all the products/previews and the pre-calculated Collaborative Filtering recommendations in the DBs, it is time to create a simple REST services that will retrieve the final recommendations. For that purpose we have selected the framework Spray.io, which is simple, elegant and pure Scala.

Creating URL mappings for our recommender service is very simple:

startServer(interface = "localhost", port = serverPort) {
  path("recs" / "cf" / "pro") {
    post(
      entity(as[ProductRecommendationRequest]) { request =>
        complete {
          RecommenderService.getCollaborativeFilteringRecommendations(request)
                            .toStream
        }
      }
    )
  } ~
  path("recs" / "cf" / "usr") {
    post(
      entity(as[UserRecommendationRequest]) { request =>
        complete {
          RecommenderService.getCollaborativeFilteringRecommendations(request)ç
                            .toStream
        }
      }
    )
  } ~
  ...

Now it is time to code our recommendation services.  We should start with the Collaborative Filtering recommendations. In this case it is simple because they have been pre-calculated. So we can just read them from MongoDB:

#User CF Recs
def parseUserRecs(o: DBObject, maxItems: Int): List[Recommendation] = {
  o.getAs[MongoDBList]("recs").getOrElse(MongoDBList())
                              .map { case (o: DBObject) => parseRec(o) }
                              .toList.sortBy(x => x.rating).reverse.take(maxItems)
}

val listRecs = mongoClient(mongoConf.db)(ALSTrainer.PRODUCT_RECS_COLLECTION_NAME)
                 .findOne(MongoDBObject("productId" -> productId))
                 .getOrElse(MongoDBObject())

val userRecs = parseProductRecs(listRecs, MAX_RECOMMENDATIONS)


#Product CF Recs

def parseProductRecs(o: DBObject, maxItems: Int): List[Recommendation] = {
  o.getAs[MongoDBList]("recs").getOrElse(MongoDBList())
                              .map { case (o: DBObject) => parseRec(o) }
                              .toList.sortBy(x => x.rating).reverse.take(maxItems)
}

val listRecs = mongoClient(mongoConf.db)(ALSTrainer.USER_RECS_COLLECTION_NAME)
                .findOne(MongoDBObject("userId" -> userId))
                .getOrElse(MongoDBObject())

val productRecs = parseProductRecs(listRecs, MAX_RECOMMENDATIONS)

Then we should code the content-based recommendations. Although we have not pre-calculated these types of recommendations, it is quite simple to obtain them using ElasticSearch. To do this, we need to ask the server which products match certain criteria more:

def parseESResponse(response: SearchResponse): List[Recommendation] = {
  response.getHits match {
    case null => List[Recommendation]()
    case hits: SearchHits if hits.getTotalHits == 0 => List[Recommendation]()
    case hits: SearchHits if hits.getTotalHits > 0 => hits.getHits.map { hit => new Recommendation(hit.getId, hit.getScore) }.toList
  }
}

#Similar Products CB Recs
val indexName = esConf.index
val q = QueryBuilders.moreLikeThisQuery("name", "features")
          .addLikeItem(new MoreLikeThisQueryBuilder.Item(indexName, DatasetIngestion.PRODUCTS_INDEX_NAME, productId))

val similarRecs = parseESResponse(esClient.prepareSearch().setQuery(q)
                                .setSize(MAX_RECOMMENDATIONS)
                                .execute().actionGet())

#Text Search CB Recs
val indexName = esConf.index
val q = QueryBuilders.multiMatchQuery(text, "name", "features")

val textRecs = parseESResponse(esClient.prepareSearch().setQuery(q)
                                .setSize(MAX_RECOMMENDATIONS)
                                .execute().actionGet())

For hybrid recommendations, the theory is simple: use different types of recommendations and combine their output using weights.

val cbRatingFactor = 1 - cfRatingFactor

val cfRecs = findProductCFRecs(productId, ALSTrainer.MAX_RECOMMENDATIONS)
               .map(x => new HybridRecommendation(x.productId, x.rating, x.rating * cfRatingFactor))

val cbRecs = findContentBasedMoreLikeThisRecommendations(productId, ALSTrainer.MAX_RECOMMENDATIONS)
                .map(x => new HybridRecommendation(x.productId, x.rating, x.rating * cbRatingFactor))

val finalRecs = cfRecs ::: cbRecs

val hyvrid = finalRecs.sortBy(x => -x.hybridRating).take(maxItems)

Conclusions

In the second part of the series, we have learnt how to:

1.          Create Collaborative Filtering recommendations using Spark.

2.          Obtain content-based recommendations using ElasticSearch.

3.          Combine several types of recommendations to create a hybrid recommender.

If you are interested in finding out more, the code is freely available in my Github repository.

Stratio
Author

Stratio guides businesses on their journey through complete #DigitalTransformation with #BigData and #AI. Stratio works worldwide for large companies and multinationals in the sectors of banking, insurance, healthcare, telco, retail, energy and media.