Movie Recommender

Let’s create a movie recommender based on ratings. In this example we have a collection of movies, a bunch of users, and movie ratings from users that range from 1 to 5. These ratings are sparse because each user rates only a small percentage of the total movies, and they are biased because users' ratings are distributed differently. Our goal is to take any user ID and search for recommended movies for that user.

There are five parts to this recommender system:

  • A dataset of movie recommendations.
  • 2x deep learning models for embedding movies and users.
  • A vector index to perform similarity search on those embeddings.
  • A custom deep ranking model to score user-movie pairs and further improve relevance of the recommended movies.

We will use Pinecone to tie everything together and expose the recommender as a real-time service that will take any user ID and return relevant movie recommendations.

The architecture of our recommender system is shown below. In the “write” path (load), we start with 1,682 movie IDs and transform each into vector embeddings. The embedding function is trained such that proximity between movies in the multi-dimensional space represents the likelihood that a single user will rate both movies similarly. The 1,682 embeddings are then stored in the vector index.

Architecture sketch of a movie recommender system

In the “read” path (query), an embedding function transforms a given user ID into an embedding in the same vector space as the movies, representing the user’s movie preference. Candidate movie recommendations are then fetched based on proximity to the user’s location in the multi-dimensional space. Finally, these candidates are ranked based on the custom deep ranking model.

Prepare Data and Models

Install and Import Relevant Python Packages

!pip install --quiet scikit-learn pandas matplotlib
!pip install --quiet tensorflow
!pip install --quiet -U tfds-nightly
!pip install --quiet -U pinecone-client

import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds

Prepare Movie Rating Data

We are using the standard Movielens recommendation dataset containing 100,000 ratings (1-5) from 943 users on 1,682 movies.

# Load data
_ratings = tfds.load("movielens/100k-ratings", split="train")
_movies = tfds.load("movielens/100k-movies", split="train")

# Collect data that we'd use throughout this notebook
class DATA:
    ratings = None
    train = None
    test = None

DATA.ratings = _ratings.map(lambda x: {
    "movie_title": x["movie_title"],
    "user_id": x["user_id"],
    "user_rating": x["user_rating"]  # We keep the ratings of a user-movie pair
})

# Split data into training and testing
shuffled = DATA.ratings.shuffle(100_000, reshuffle_each_iteration=False)
DATA.train = shuffled.take(80_000)
DATA.test = shuffled.skip(80_000).take(20_000)

Load Premade Movie and User Embedding Models

We use premade movie and user embedding models following this tutorial by the TensorFlow recommender library. The models receive user id or movie title and output a corresponding 32-dimensional vector representation. The models were trained to preserve “similarity” such that similar user or movie ranking profiles will induce high inner-product values. For more information, follow the TensorFlow recommender tutorial.

# extract the models
import tarfile
tfile = tarfile.open("premade-models.tgz")
tfile.extractall()
tfile.close()

user_model = tf.keras.models.load_model('tf_retrieval_user_model')
movie_model = tf.keras.models.load_model('tf_retrieval_movie_model')

Load Premade Pairwise Ranking Model

We premade a pairwise ranking score model following this TensorFlow recommender library tutorial. The pairwise scoring model receives a user-movie vector embeddings pair and returns a relevance score indicating how relevant the user is to the movie.

scoring_model = tf.keras.models.load_model('tf_ranking_pairwise_score')

Sanity-Check the Models

Let’s run the three premade models locally. The user and movie models receive a corresponding id or title and output a vector. The scoring model receives two vectors and outputs a single score value. Later, we will upload these models to Pinecone’s model hub.

user_embedding = user_model(np.array(["42"]))
movie_embedding = movie_model(np.array(["One Flew Over the Cuckoo's Nest (1975)"]))
pairwised_score = scoring_model(tf.concat([user_embedding, movie_embedding], axis=1))
print(f"user embedding: {user_embedding}")
print(f"movie embedding: {movie_embedding}")
print(f"pairwise score: {pairwised_score}")
user embedding: [[ 0.01683676  0.4281152   0.4133582  -0.12968868  0.12420761  0.50607353
   0.08321043 -0.1869202  -0.1490686  -0.4416855   0.10145921  0.29046464
   0.29867113  0.5125411  -0.19146752  0.0899531  -0.47919908 -0.35008752
  -0.49349973 -0.21783432 -0.10766218 -0.32919145 -0.37802044 -0.39593038
  -0.19613707 -0.06320453 -0.05047337 -0.42567044 -0.20733188  0.45100415
   0.03500173 -0.15795036]]
movie embedding: [[ 0.60710007 -0.48249206 -0.35593712  0.13949828  0.40926376  0.31134516
   0.2955666   0.08130229  0.29382494 -0.14615472 -0.17722447  0.51746374
   0.1466727   0.50280887 -0.09506559  0.57231975  0.35222292  0.35850292
  -0.16756673 -0.17023738 -0.29335937  0.1672158  -0.2980432  -0.5691722
   0.25732362 -0.35206133  0.395624    0.29205453  0.24314538  0.13837588
   0.15536724  0.10586588]]
pairwise score: [[3.765619]]

Configure Pinecone

This section shows how to use Pinecone to easily build and deploy a movie recommendation engine that turns raw data into vector embeddings, maintains a live index of those vectors, and returns recommended movies on demand.

The starting point is the premade movie and item embedding models, and the pairwise deep ranking model. Next, we show how to upload these models to Pinecone’s model hub. Then, we will define how to utilize them in Pinecone’s index graph. Finally, we will start a service and retrieve recommendations for an arbitrary user-id.

Install and Set Up Pinecone

import pinecone.graph
import pinecone.service
import pinecone.connector
import pinecone.hub
import pandas as pd

Set your API key:

# Load Pinecone API key

api_key = 'FILL_IN_YOUR_API_KEY'
pinecone.init(api_key=api_key)

(Need an API key? Get it here.)

Define and Upload Models

Pinecone lets you add models for preprocessing raw data during writes (data upserts) and reads (queries). This is done by defining a graph, or the series of steps any upsert, delete, query, or fetch request must go through. Each step in the graph is a docker image that gets uploaded to the Pinecone model hub. The model hub API provides utilities and abstractions for creating and uploading these docker images.

movie_image_builder = pinecone.hub.ImageBuilder(
    image="tfrs-explicit-movie-preprocessor:demo1",  # The name of the docker image (you should also tag the image)
    build_path="./docker_build/movie_preprocessor/demo1",  # path to which docker build artifacts are saved
    model_path='./movie_preprocessor.py',  # main model file
    pip=['tensorflow==2.3.0', 'tensorflow-recommenders'],  # additional pip packages needed
    data_paths=['./movie_preprocessor_tfmodel'],  # additional files or directories needed
)

user_image_builder = pinecone.hub.ImageBuilder(
    image="tfrs-explicit-user-preprocessor:demo1",  # The name of the docker image (you should also tag the image)
    build_path="./docker_build/user_preprocessor/demo1",  # path to which docker build artifacts are saved
    model_path='./user_preprocessor.py',  # main model file
    pip=['tensorflow==2.3.0'],  # additional pip packages needed
    data_paths=['./user_preprocessor_tfmodel'],  # additional files or directories needed
)

ranking_image_builder = pinecone.hub.ImageBuilder(
    image="tfrs-explicit-ranking-postprocessor:demo1",  # The name of the docker image (you should also tag the image)
    build_path="./docker_build/ranking_postprocessor/demo1",  # path to which docker build artifacts are saved
    model_path='./ranking_postprocessor.py',  # main model file
    pip=['tensorflow==2.3.0'],  # additional pip packages needed
    data_paths=['./ranking_postprocessor_tfmodel'],  # additional files or directories needed
)

Log Into the Pinecone Model Hub

login_cmd = pinecone.hub.get_login_cmd()
!{login_cmd}

Serialize Movie and User Preprocessors

movie_model.save(movie_image_builder.data_paths[0])

user_model.save(user_image_builder.data_paths[0])

scoring_model.save(ranking_image_builder.data_paths[0])

Create Pinecone-Compatible Model Files

Pinecone’s index graph contains either preprocessing or postprocessing units. Preprocessors manipulate the upsert, delete, or query inputs. Postprocessors manipulate fetch request index outputs.

Create the Movie Embedding Preprocessor Model

The movie preprocessor is a python class that applies a pinecone.hub.preprocessor decorator. Our recommender service will index and fetch movie vector embeddings. The movie preprocessor will transform movie ids into vectors before upserting or querying them.

%%writefile {movie_image_builder.model_path}

import numpy as np
import tensorflow as tf
from pinecone.hub import preprocessor

# by default all the data files on Pinecone are placed in the ./data directory.
DATA_PREFIX = './data'

@preprocessor
class MoviePreprocessor:
    def __init__(self):
        self.tfmodel = tf.keras.models.load_model('{0}/movie_preprocessor_tfmodel'.format(DATA_PREFIX))  # Load tensorflow model

    def transform(self, vectors):
        embeddings = self.tfmodel(np.array(vectors)).numpy()
        return embeddings

Create the User Embedding Preprocessor Model

Also here, the user preprocessor is a python class that applies a pinecone.hub.preprocessor decorator. The service recommendations are movies that highly match a user ranking profile. The movie preprocessor will transform user ids into vector embeddings before querying the service’s index.

%%writefile {user_image_builder.model_path}

import numpy as np
import tensorflow as tf
from pinecone.hub import preprocessor

# by default all the data files on Pinecone are placed in the ./data directory.
DATA_PREFIX = './data'

@preprocessor
class UserPreprocessor:
    def __init__(self):
        self.tfmodel = tf.keras.models.load_model('{0}/user_preprocessor_tfmodel'.format(DATA_PREFIX))  # Load tensorflow modelr_r

    def transform(self, vectors):
        embeddings = self.tfmodel(np.array(vectors)).numpy()
        return embeddings

Creating the Pairwise Scoring Postprocessor Model

The scoring postprocessor is a python class that applies a pinecone.hub.postprocessor decorator. Remember we index the movie using a max dot-product “similarity”. Yet, we want to rank the results using a custom premade ranking model. In query scenarios, our index will retrieve and rank movies based on the maximum-dot-product score. The scoring postprocessor will receive these query results and re-rank them using its custom pairwise-scoring model.

%%writefile {ranking_image_builder.model_path}

import numpy as np
import tensorflow as tf
from pinecone.hub import postprocessor, QueryResult

# by default all the data files on Pinecone are placed in the ./data directory.
DATA_PREFIX = './data'

@postprocessor
class RankingPostprocessor:
    def __init__(self):
        self.tfmodel = tf.keras.models.load_model('{0}/ranking_postprocessor_tfmodel'.format(DATA_PREFIX))  # Load tensorflow model

    def transform(self, queries, matches):
        output = []
        for q, match in zip(queries, matches):
            updated_scores = [(i, self.tfmodel(tf.concat([[q], [v]], axis=1)).numpy().flatten()[0]) for i, v in enumerate(match.data)]
            sorted_inx_score = sorted(updated_scores, key=lambda i_s: -i_s[1])
            new_scores = [s for _,s in sorted_inx_score]
            new_ids = [list(match.ids)[i] for i,_ in sorted_inx_score]
            new_data = np.array([list(match.data)[i] for i,_ in sorted_inx_score])
            output.append(QueryResult(ids=new_ids, scores=new_scores, data=new_data))

        return output

Upload Models to the Pinecone Model Hub

Package all of the Artifacts into the Image Build Directory

movie_image_builder.package(exist_ok=True)
user_image_builder.package(exist_ok=True)
ranking_image_builder.package(exist_ok=True)

Build the Models Docker Images and Push Them to the Model Hub

The model hub API provides utilities for creating and pushing the docker images.

# build and push models if they do not already exsit
if movie_image_builder.image.split(':')[0] not in pinecone.hub.list_repositories() or movie_image_builder.image.split(':')[1] not in pinecone.hub.list_repository_tags(movie_image_builder.image.split(':')[0]):
    !{movie_image_builder.get_build_cmd()}
    !{movie_image_builder.get_push_cmd()}
if user_image_builder.image.split(':')[0] not in pinecone.hub.list_repositories() or user_image_builder.image.split(':')[1] not in pinecone.hub.list_repository_tags(user_image_builder.image.split(':')[0]):
    !{user_image_builder.get_build_cmd()}
    !{user_image_builder.get_push_cmd()}
if ranking_image_builder.image.split(':')[0] not in pinecone.hub.list_repositories() or ranking_image_builder.image.split(':')[1] not in pinecone.hub.list_repository_tags(ranking_image_builder.image.split(':')[0]):
    !{ranking_image_builder.get_build_cmd()}
    !{ranking_image_builder.get_push_cmd()}

Deploy and Connect to the Service

Next, we deploy the Pinecone service so it can receive and respond to search queries.

graph = pinecone.graph.IndexGraph(metric='dotproduct')

# Name of the hub images
movie_image_name = pinecone.hub.as_user_image(movie_image_builder.image)
user_image_name = pinecone.hub.as_user_image(user_image_builder.image)
ranking_image_name = pinecone.hub.as_user_image(ranking_image_builder.image)

# Add to the graph functions that will transform the items and the queries.
movie_preprocessor = pinecone.hub.HubFunction(name='movie-preprocessor', image=movie_image_name)
user_preprocessor = pinecone.hub.HubFunction(name='user-preprocessor', image=user_image_name)

ranking_postprocessor = pinecone.hub.HubFunction(name='ranking-postprocessor', image=ranking_image_name)

graph.add_write_preprocessor(fn=movie_preprocessor)
graph.add_read_preprocessor(fn=user_preprocessor)

graph.add_postprocessor(fn=ranking_postprocessor)
# View the updated graph
graph.view()

Function graph for movie recommender system

service_name = 'tfrs-movielens-explicit'
pinecone.service.deploy(service_name, graph, timeout=300)

conn = pinecone.connector.connect(service_name)

conn.info()

Once deployed we could also add new movies or users to the vector index.

Upload Movie Titles

Recall, that movie titles are being transformed into vector embeddings, which then, are being stored in a vector index. The vector index role is to receive queries and then efficiently identify and retrieve the best results. In this context,  queries are user vector embeddings and the retrieved results are a set of candidate movie recommendations.

# note that the length of item ids is restricted to 64 chars
all_movies = [title.decode()[:64] for title in DATA.ratings.map(lambda xx: xx['movie_title']).as_numpy_iterator()]
items_to_upload = [(title, title) for title in all_movies]
# print the input format -- observe that this are just text strings
display(items_to_upload[:5])

upsert_acks = conn.upsert(items=items_to_upload).collect()

conn.info()

Search for Movie Recommendations

Hurray! Your movie recommender service is up and running. You can now search for recommended movies for any user directly from your notebook or app, and get ranked results in real-time. Let’s add one more function that shows a movie poster for each result, just for fun.

Include Movie Posters in Results

The following utility functions helps visualize the results using scraped movie posters from the web. This is completely optional and just for fun.

import requests
from IPython.display import Image, display
from IPython.core.display import HTML

def movie_title_to_poster_url(title):
    POSTERS_URL_TEMPLATE = "http://www.omdbapi.com/?i=tt3896198&apikey=4a3604bd&t="
    parsed_title = ' '.join(title.split()[:-1])
    if parsed_title.lower().endswith(', the'):
        parsed_title = "The "+parsed_title[:-len(', the')]
    try:
        r = requests.get(POSTERS_URL_TEMPLATE+parsed_title).json()
        return r['Poster']
    except:
        # Fallback image
        return "https://cdn4.iconfinder.com/data/icons/small-n-flat/24/movie-alt2-512.png"

def path_to_image_html(path):
    return '<img src="'+ path + '" width="100" >'

def show_image_tiles(images_urls, n_col=6):
    import math

    rows = [images_urls[ii * n_col: (1+ii) * n_col] for ii in range(math.ceil(len(images_urls) / n_col))]
    for ii, rr in enumerate(rows):
        if len(rr) < n_col:
            rows[ii] = rr + [''] * (n_col - len(rr))
    df = pd.DataFrame(rows)
    display(HTML(df.to_html(escape=False, formatters=[path_to_image_html]*df.shape[1])))

Let’s see what our movie recommender picks for User 55. Note how we retrieve 50 movies, yet only showing those ranked in the top five. The reason for setting the top-k=50 value is to give the re-ranking postprocessor (the ranking model) a sufficiently large set of movie candidates. Once reranked, we only care about the top five results.

user_ids = ['55']

# Query by user embeddings
cursor = conn.query(queries=user_ids, top_k=50, include_data=True)
query_results = cursor.collect()

# Print results
for _id, res in zip(user_ids, query_results):
    print(f'user_id={_id}')
    df = pd.DataFrame({'ids': res.ids[:5], 'scores': res.scores[:5]})
    df['image'] = list(map(movie_title_to_poster_url, df['ids']))
    display(HTML(df.to_html(escape=False, formatters=dict(image=path_to_image_html))))

idsscoresimage
0Sudden Death (1995)4.426783
1Thinner (1996)4.384661
2Mr. Holland's Opus (1995)4.271734
3Die Hard (1988)4.262465
4Star Wars (1977)4.259743

User Ranking Profile

Note the quality of the results depends on the quality of the provided premade models, and the accuracy of Pinecone’s vector index. The example demonstrates that the recommendations fit well with the user ranking profile.

user_ratings = [r for r in DATA.ratings if r["user_id"] == '55']
content = [dict(title="".join(map(chr,r["movie_title"].numpy())), movie=movie_title_to_poster_url("".join(map(chr,r["movie_title"].numpy()))), rating=r['user_rating'].numpy()) for r in user_ratings]
df = pd.DataFrame(content).sort_values('rating', ascending=False)
display(HTML(df.to_html(escape=False, index=False, formatters=dict(movie=path_to_image_html))))
titlemovierating
Fugitive, The (1993)5.0
Die Hard (1988)5.0
Twister (1996)5.0
Blade Runner (1982)5.0
Heat (1995)5.0
Braveheart (1995)5.0
Raiders of the Lost Ark (1981)4.0
Return of the Jedi (1983)4.0
Star Wars (1977)4.0
Pulp Fiction (1994)4.0
Volcano (1997)3.0
Men in Black (1997)3.0
Twelve Monkeys (1995)3.0
Independence Day (ID4) (1996)3.0
Rock, The (1996)3.0
Eraser (1996)2.0
Batman & Robin (1997)2.0
Speed 2: Cruise Control (1997)1.0
Mission: Impossible (1996)1.0
Executive Decision (1996)1.0
Con Air (1997)1.0

Comparing Recommendations Without Re-ranking

You may be wondering if the postprocessing re-ranking step is worth the effort. Let’s run another service without a postprocessing step and compare the results.

graph_no_postprocessor = pinecone.graph.IndexGraph(metric='dotproduct')

# Add to the graph functions that will transform the items and the queries.
graph_no_postprocessor.add_write_preprocessor(fn=movie_preprocessor)
graph_no_postprocessor.add_read_preprocessor(fn=user_preprocessor)

# View the updated graph
graph_no_postprocessor.view()

Function graph for movie recommender system

service_no_postprocessor_name = 'tfrs-movielens-explicit-without-postprocessor'
pinecone.service.deploy(service_no_postprocessor_name, graph_no_postprocessor, timeout=300)

conn_no_postprocessor = pinecone.connector.connect(service_no_postprocessor_name)
upsert_acks = conn_no_postprocessor.upsert(items=items_to_upload).collect()
conn_no_postprocessor.info()

user_ids = ['55']

# Query by user embeddings
cursor = conn_no_postprocessor.query(queries=user_ids, top_k=50, include_data=True)
query_results = cursor.collect()

# Print results
for _id, res in zip(user_ids, query_results):
    print(f'user_id={_id}')
    df = pd.DataFrame({'ids': res.ids[:5], 'scores': res.scores[:5]})
    df['image'] = list(map(movie_title_to_poster_url, df['ids']))
    display(HTML(df.to_html(escape=False, formatters=dict(image=path_to_image_html))))

idstitlesscoresimage
0Lost World: Jurassic Park, The (1997)2.879311
1Men in Black (1997)2.787736
2Executive Decision (1996)2.739666
3Con Air (1997)2.685612
4Rock, The (1996)2.578943

In this example, it appears the re-ranking step resulted in much better recommendations.


Turn Off the Service

Usually, you will deploy the service and keep it running for as long as it’s being used. In some cases you may want to turn it off, for example if you want to deploy a different version of the service with upgraded models.

for svc in pinecone.service.ls():
    pinecone.service.stop(svc)

Summary

We showed how to build a movie recommendation service using Pinecone. The service embeds user IDs and movie titles, saves them in a vector index, receives and embeds queries, then retrieves, ranks, and displays personalized movie recommendations for any given user.