How to upload 50 OpenCV frames into cloud storage within 1 second

How to read RTSP/Video frames and upload them into cloud storage in an asynchronous way

Bala Venkatesh
Analytics Vidhya

--

USE CASE

In this modern world, I presume that most of us are conversant with new loads of industries that use computer vision applications. In particular, a CCTV surveillance camera and video analysis, which are contributing towards a major role in computer vision technology.

For instance, when we analyze a CCTV camera, as a first step, we should read RTSP URL using OpenCV, then we should store it somewhere in the cloud to analyze it further.

But the problem is, when we upload the frames one after the other into the cloud, it will take some time to upload, won’t it?

In the interest to procure a clear understanding of this, I have done an experiment with Google bucket, and it reckoned that one frame takes 1.05secs to upload the Google bucket. Due to this, we will have to wait for 1 sec to get a response and then, we need to upload the next frames in the row.

I was spit-balling to discover a solution for this, at your anticipation, here you go!!!

The solution is, we can upload frames in an asynchronous way using Celery.

When we are uploading the frames in an asynchronous way, we can’t get the sequence frames, as a resort, we should use the group and chain concept in Celery.

Many (non-expertise) would be interested to be aware; What is Celery?

Celery is one of the most popular backgrounds for job managers in the Python world.

"Celery" is compatible with several message brokers like RabbitMQ or Redis. These can act as both producer and consumer. Further, "Celery" is an asynchronous task queue/job queue based on distributed message passing. In addition to that, It is focused on real-time operations and supports scheduling as well.

Having clarified the definition, let us see below how to configure celery with python code.

Step 1:- Import all necessary celery package

from celery import Celery
from celery.result import AsyncResult
from celery.result import allow_join_result
from celery.decorators import periodic_task

Step 2:- We should configure the broker and backend in celery. I have used Redis as backend, so install Redis in your system and, please ensure it’s running successfully;

app = Celery(‘tasks’, backend=’redis://guest@127.0.0.1:6379', broker=’redis://guest@127.0.0.1:6379')

Step 3:- In order to call a function in an asynchronous way, we should put “@app.taks annotation” on the function.

Here’s the sample celery coding to upload frames into Google bucket.

@app.task(bind=True, max_retries=30)
def upload_frames_gcs(self, file_name):
try:
url = upload_file_to_gcs(file_name)
return url
except Exception as e:
raise self.retry(exc=e)

Step 4:-The below are the most important steps:

We won’t be able to directly call the function and upload frames in an asynchronous way, because we can’t get sequence frames after uploading, so we should use the Chains and Groups concept in celery to upload frames into a bucket. Using this technique, we can upload up to 5 or 10 frames in parallel, also we can get a sequence order of frames. However, before moving into coding, let us first see “what is chains and groups in Celery”

Chains in Celery

A chain is a primitive that lets us link more tasks into one singular signature, so that itis called “One after the other, essentially forming a chain of callbacks”.

Perhaps, if you are still unsure, nevertheless, the below diagram will give you a clear idea of how the chain works in celery. These are the task's id in celery.

Groups in Celery

The group primitive is a signature that takes a list of tasks which should be applied in parallel.

This is sample coding to explain, How I uploaded frames into google bucket using groups and chain technique in celery.

jobs = group(upload_frames_gcs.s(file_name, ts) for ts, file_name in file_name_dic.items())result = jobs.apply_async()

Understandably, I am calling upload_frames_gcs function inside a group method, then you can see “s” to pass a parameter called “Chains concept “ in celery, this allows you to link the signatures, in result “One is called after the other, essentially forming a chain of callbacks”. Finally, we can get a group of results in one task.

Step 5:-No wonder, if you may think how to get frames URL after uploading in celery. Simple as that, in the result variable you can get the task id of that group function, and we can use the task id to get results.

However, Be cautious to check the status of the task, and once it’s completed, we can get the frames URL.

def taskid_status(task_id_array):
for task in task_id_array:
if task.successful():
task_id_array.remove(task)
with allow_join_result():
frames_array = []
for results in task.join():
frame_dic = {}
frame_dic['frame_url'] = results[0]
frames_array.append(frame_dic)
return task_id_array, frames_array

In the frames_array variable, you can get all the frames with a timestamp.

I have tested the performance with multiple dissimilar test cases.

  1. 5 frames take 0.85 sec to upload google storage.
  2. 10 frames take 0.77 to 0.82 sec to upload google storage.
  3. 15 frames take 0.9 to 1.0 sec to upload google storage.
  4. 30 frames take 0.7 to 0.8 sec to upload google storage.

The above is evident to say that, there isn’t much difference to increase the number of frames to upload into a bucket, because multiprocessing is used to perform concurrent execution of tasks in celery.

If you want further assistance or help. please feel free to contact me balavenkatesh.com 📬 I am happy to help you.🖥️

--

--

Bala Venkatesh
Analytics Vidhya

I have a passion for understanding technology at a fundamental level and Sharing ideas and code. * Aspire to Inspire before I expire*