How to make a movie recommender: making a REST API using FastAPI

Juan Domingo Ortuzar
Analytics Vidhya
Published in
6 min readDec 12, 2020

--

FastAPI is a backend framework for Python, just like Flask or Django (more Flask than Django). This framework makes it really easy to create and test endpoints, since it abstract a lot of complicated parts of REST API. This has its pros and cons, mostly ease of use in exchange with configurability. All code for the backend can be found here. All code for the project can be found here.

What is a REST API?

An API (application programming interface) is an interface that defines how many one program can interact with another program, based on rules and definitions (source). A REST (REprestentational State Transfer) API is a “style” or “guideline” to write web services. In other words a Rest API is a software that is especially designed to communicate and interact in the web (source).

In this project we will divide our application in two a Backend and a Frontend. The frontend is how a user will interact with our application, the buttons, the text that the user sees, etc. This we will make using Svelte. The backend, has the job to process the interaction between the user and the frontend, it process the data and in our case it is who interacts with our recommendation model. As an example, we can use a vending machine, the frontend is the button where you choose your product, the coin slot, and the screen that tells you how much money you have added. The backend would be the system that counts the money, the motors that give you your product, who calculates your change among others.

We also need a database

So to save our data and other things we are going to use MongoDB, because I wanted to use a document based database engine. To install the database you can use the following tutorial, but it is not necessary since we will be using the Docker image to simplify things. You only need to install MongoDB if you are going to test the system without Docker. Feel free to change the code for this tutorial to use any database engine you like.

Lets make a REST API

Lets start creating a virtual environment for our backend. For this we will use virtualenv (check this tutorial on what is and how to use virtual environments in python). Here are the dependencies for this script:

fastapi[all]
pymongo
pandas
requests
uvicorn

You can save them in a file called requirements.txt and use the command pip install -r requirements.txt or you can install each dependency separately.

Lets start by connecting our application to our MongoDB database. Since we are going to use Docker to merge our different systems together, there will be things that won’t work when you run the script, that is OK. To test things, there are commented parameters that you can un-comment and comment the parameter with the same name.

from fastapi import FastAPI, BackgroundTasks, File, UploadFile
from fastapi.middleware.cors import CORSMiddleware
from pymongo import MongoClient
import pandas as pd
import json
import uvicorn
import uuid
from bson import json_util
from typing import List
import requests
import numpy as np
import os
import time
# MONGO_HOST = "localhost"
MONGO_HOST = os.getenv("MONGOHOST")
# TF_SERVING = "localhost"
TF_SERVING = os.getenv("TF_SERVING_HOST")
mongo_client = MongoClient(MONGO_HOST, 27017)
db = mongo_client["MovieRecommenderDB"]
movie_col = db["movies"]
user_col = db["user"]
status_col = db["status"]
ratings_col = db["ratings"]
movie_encoded_col = db["movieEncoding"]

Ignoring the imports, we are simply connecting to our MongoDB instance, choosing our database named MovieRecommenderDB and our collections. If you haven't instantiated your database or your collections before, don't worry MongoDB will take care of that.

Now, lets start creating our FastAPI application.

app = FastAPI()origins = [
"*"
]
app.add_middleware(
CORSMiddleware,
allow_origins=origins,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)

In this couple of lines we have a lot to unpack. First, the app is the FastAPI instance, this is the minimum needed to create our API. The following lines are there to allow for our frontend to communicate with our backend, to learn more about CORS your can read here and/or here.

Lets assume that the database is going to be updates using our API, so lets create the logic to make this available.

def get_number_of(collection):
num_of_movies = collection.find({}).count()
return num_of_movies
def save_to_db(csv_file, file_size, jobId, collection):
_index = get_number_of(collection)
csv_file.file.seek(0)
for i, l in enumerate(csv_file.file):
pass
csv_file.file.seek(0)
headers = csv_file.file.readline().decode().replace(";\\n", "").split(";")
for j, l in enumerate(csv_file.file):
line = l.decode()
line = line.replace(";\\n", "")
row_elem = line.split(";")
if len(row_elem) > len(headers):
job_doc = {"jobId": jobId,
"status": "Error",
"percentage": int((j/i)*100),
"reason": f"Ilegal Character in line {j}"}
status_col.update_one({"jobId": jobId}, {"$set": job_doc})
else:
doc = {}
for e in range(len(row_elem)):
doc[headers[e]] = row_elem[e]
doc["index"] = _index + j
if collection.find_one(doc) is None:
collection.insert_one(doc)
else:
pass
status_col.update_one({"jobId": jobId}, {"$set": {"percentage": int((j/i)*100)}})
status_col.update_one({"jobId": jobId}, {"$set": {"percentage": 100, "status": "complete", "fileName": csv_file.filename, "fileSize": file_size, "numOfRows": i}})

def make_movie_encoding():
unique_movies = ratings_col.distinct("movieId")
movie_encoder = {x: i for i, x in enumerate(unique_movies)}
for key, item in movie_encoder.items():
doc = {"movieId": key, "index": item}
movie_encoded_col.insert_one(doc)
def save_ratings_to_db(csv_file, file_size, jobId):
save_to_db(csv_file, file_size, jobId, ratings_col)
make_movie_encoding()
@app.get("/status/{jobId}")
def get_status_bulk_update(jobId: str):
job_doc = status_col.find_one({"jobId": jobId}, {'_id': False})
job_doc = json.loads(json_util.dumps(job_doc))
return job_doc
def insert_status(doc):
status_col.insert_one(json.loads(json.dumps(doc)))
@app.post("/movies/bulk_update")
def bulk_update_movie_database(background_task: BackgroundTasks, csv_file: UploadFile = File(...)):
jobId = str(uuid.uuid4())
csv_file.file.seek(0, 2)
file_size = csv_file.file.tell()/1000
job_doc = {"jobId": jobId,
"status": "inProgress",
"percentage": 0}
insert_status(job_doc)
background_task.add_task(save_to_db, csv_file, file_size, jobId, movie_col)
return {"filename": csv_file.filename,
"file_size": file_size,
"job": job_doc}
@app.post("/ratings/bulk_update")
def bulk_update_rating_database(background_task: BackgroundTasks, csv_file: UploadFile = File(...)):
jobId = str(uuid.uuid4())
csv_file.file.seek(0, 2)
file_size = csv_file.file.tell()/1000
job_doc = {"jobId": jobId,
"status": "inProgress",
"percentage": 0}
insert_status(job_doc)
background_task.add_task(save_ratings_to_db, csv_file, file_size, jobId)
return {"filename": csv_file.filename,
"file_size": file_size,
"job": job_doc}

In this code we will start with the functions bulk_update_rating_database, bulk_update_movie_database and get_status_bulk_update. As you can see, above all these function you can find a python decorator @app.get or @app.post. These decorator allows us to make URL paths or endpoints. Endpoints cant be one of many, but most of the time you use four GET, POST, UPDATE and DELETE. For the data pipeline we use the POST to update our database with new ratings or new movie. We use the GET method to check the status of a task

One of the great features of FastAPI is something called BackgroundTasks. You can tell FastAPI to send some work, like in this case data preprocessing, to run in the background and give your user a way to check the status of their task.

Now lets go into calling our model to make recommendations.

def find_movies_by_ids(id_list):
title_list = []
for id in id_list:
movie_title = movie_col.find_one({"movieId": str(id)})
if movie_title is not None:
title_list.append(movie_title["title"])
else:
pass
return title_list
def get_recomendation(movie_ids):
movie_array = np.hstack(([[0]]*len(movie_ids), movie_ids))
body = {"instances": movie_array.tolist()}
url = f"http://{TF_SERVING}:8501/v1/models/movie_model:predict"
response = requests.request("POST", url, data=json.dumps(body))
aux = response.json()
return aux
def get_movie_index(movieIds):
movie_indexs = []
for movie in movieIds:
movie_doc = movie_col.find_one({"movieId": str(movie)})
if movie_doc is not None:
movie_indexs.append([movie_doc["index"]])
else:
pass
return movie_indexsdef encode_movieIds(movieIds):
encoded_movies = []
for movie in movieIds:
doc = movie_encoded_col.find_one({"movieId": str(movie)})
if doc is not None:
encoded_movies.append([doc["index"]])
else:
pass

return encoded_movies
def find_movies_not_watched(movieIndexs):
indexs_to_watch = []
movies_to_watch = movie_col.find({"$nor": [{"movieId": {"$in": movieIndexs}}]})
indexs_to_watch = [int(x["movieId"]) for x in movies_to_watch]
return indexs_to_watch
def clean_up_recommendations(recommendation_scores, top_indexes):
recommendation_body = []
for index in top_indexes:
movieId = movie_encoded_col.find_one({"index": int(index)}, {'_id': False})["movieId"]
movieDoc = movie_col.find_one({"movieId": movieId}, {'_id': False})
movieScore = recommendation_scores[index]
body = {
"title": movieDoc["title"],
"genre": movieDoc["genres"],
"movieScore": movieScore[0]
}
recommendation_body.append(body)
return recommendation_body
def generate_recommendations(movieIds, jobId):
start = time.time()
movieIds = [str(x) for x in movieIds]
still_to_watch_indxs = find_movies_not_watched(movieIds)
encoded_movies = encode_movieIds(still_to_watch_indxs)
recommendation = get_recomendation(encoded_movies)
recomender_scores = np.array(recommendation["predictions"]).flatten()
top_recommendations_indx = np.array(recomender_scores).argsort()[-10:][::-1]
recommendations = clean_up_recommendations(recommendation["predictions"], top_recommendations_indx)
inputMovieTitles = find_movies_by_ids(movieIds)
end = time.time()
timeTaken = end-start
status_col.update_one({"jobId": jobId}, {"$set": {"status": "complete", "input": inputMovieTitles, "recommendation": recommendations, "timeTaken": timeTaken}})
@app.post("/movie/make_recom")
def make_recomendation(movies: List, background_task: BackgroundTasks):
jobId = str(uuid.uuid4())
job_doc = {"jobId": jobId,
"status": "inProgress"
}
insert_status(job_doc)
background_task.add_task(generate_recommendations, movies, jobId)

return job_doc
@app.get("/autocomplete")
def get_autocomplete_movies():
movie_all = movie_col.find()
data = {}
for doc in movie_all:
data[doc["title"]] = doc["movieId"]
return data
if __name__ == "__main__":
uvicorn.run("main:app", host="127.0.0.1", port=8000, log_level="info")

The gist of these functions is simple, more or less. It starts by receiving a list of movie Ids, then we filter all the movies that are not in this list. Then we call out model using a HTTP request to our Tensorflow Serving server. We receive a list of probabilities, that we get the 10 indices with the highest probability. Finally we transform back those indices to movies and return to the user using our status endpoint.

How to test this?

One of the great things about FastAPI are the automatic documentation using Swagger. You can run this script on your computer, go to a web browser to the following address: http://localhost:8000/docs and you should see something like this:

Here you can try your endpoints, some may not work because of external dependencies like MongoDB or Tensorflow Serving. Don’t worry though, because we will connect everything together in the Docker part of this tutorial.

--

--