Blog

Workflow orchestration for video processing

Cover Image for Workflow orchestration for video processing
Credits: Photo by Kelly Sikkema
Patrice Gargiolo
Patrice Gargiolo

Today we’re going to see how to build workflow orchestration in the context of video processing and walk through why we need it.

Workflow Orchestration: A Brief Explanation

Workflow orchestration streamlines complex processes with multiple stages or tasks by automating and managing them efficiently.

This can include manual or automated tasks and is applicable to various types of industries. For example, in an automobile assembly line, tasks like welding, painting, and final assembly must be coordinated. Additionally, managing material flow, machine maintenance, and quality control checkpoints are essential.

In the context of video processing, workflow orchestration plays a pivotal role by streamlining the various tasks involved in the end-to-end production pipeline. By automating and coordinating tasks, such as ingestion, transcoding, editing, quality control, compression, and distribution, workflow orchestration ensures a seamless and efficient process to serve diverse output formats and platforms.


The problem

Efficiently managing workflows can be challenging, as it involves monitoring the progress and status of tasks. These tasks may have varying priorities, dependencies, and require error handling. While it may be manageable on a single machine, scaling to a cluster requires a resilient workflow system that can withstand potential failures such as power outages or hardware malfunctions.

Furthermore, different stages of the pipeline may require different hardware resources, such as GPUs or specialised video processors, which need to be allocated and managed appropriately. Additionally, with the increasing amount of data that needs to be processed, stored, and distributed, workflow orchestration needs to consider data integrity, security, and compliance. Addressing these challenges requires a powerful and flexible workflow orchestration solution capable of meeting the needs of modern video processing pipelines.


Implementing the Workflow

There are different ways to implement a workflow orchestration system, but one common approach is to use a workflow engine. Popular workflow engines include Apache Airflow, Temporal, Uber Cadence, Netflix Conductor and many more smaller project. I highly recommend not making your own engine with tools like Redis or RabbitMQ, unless you have a very specific use case. Using existing workflow engines can save you lots of time and trouble ( and your hair :) ) because they've already fixed many common issues.

Here at Wavestream Labs, we chose Temporal as we were already using Golang and Kubernetes, so it fit nicely in our ecosystem.

To design your workflow, you need to identify the different stages of your video processing pipeline. Once you’ve identified the different stages, you can define the tasks that make up each stage. For example, ingesting might involve downloading videos from an external source, while transcoding might involve converting them to a different format or extracting a small part of the source.

Here’s an example in Go using Temporal

"go.temporal.io/sdk/worker"
"go.temporal.io/sdk/client"

// Workflow definition as code
func VideoProcessingWorkflow(ctx workflow.Context, sourceUrl string) error {
    // Download source video
    var downloadedFilePath string
    err := workflow.ExecuteActivity(ctx, DownloadSource, DownloadSourceInput{SourceUrl: sourceUrl}).GetResult(&downloadedFilePath)
    if err != nil {
        return err
    }

    // Transcode video to HLS
    var transcodedFilePath string
    err = workflow.ExecuteActivity(ctx, TranscodeToHLS, TranscodeToHLSInput{InputFilePath: downloadedFilePath, OutputFilePath: "/path/to/transcoded/video.m3u8"}).GetResult(&transcodedFilePath)
    if err != nil {
        return err
    }

    // Upload transcoded video to S3
    err = workflow.ExecuteActivity(ctx, SendToS3, transcodedFilePath).Get(ctx, nil)
    if err != nil {
        return err
    }

    return nil
}

// Create an instance of a temporal worker for managing workflow and activity executions.
w := worker.New(temporalClient, "myQueueName", worker.Options{})

activityOptions := activity.RegisterOptions{
	Name: "video-processing-activity",
}
w.RegisterActivityWithOptions(DownloadSourceActivity, activityOptions)
w.RegisterActivityWithOptions(TranscodeToHLSActivity, activityOptions)
w.RegisterActivityWithOptions(SendToS3Activity, activityOptions)

workflowOptions := workflow.RegisterOptions{
	Name: "video-processing-workflow",
}
w.RegisterWorkflowWithOptions(VideoProcessingWorkflow, workflowOptions)

err := w.Run(worker.InterruptCh())
if err != nil {
	fmt.Println("Unable to start worker", err)
	os.Exit(1)
}

With a workflow engine in hand, you can then implement the tasks and their dependencies, and the workflow engine takes care of executing them at least once, handling errors, and providing reporting.


Implementing the Tasks

This is where you’ll add the actual work to be done. In a video processing workflow, there will be a lot of them being a wrapper around FFmpeg ;)

Let’s see an example to convert the input into HLS in Golang using FFmpeg:

// LogWriter is a custom writer that writes to log.Printf
type LogWriter struct{}

// Write implements the io.Writer interface and writes data to log.Printf
func (lw *LogWriter) Write(p []byte) (n int, err error) {
	log.Printf("%s", p)
	return len(p), nil
}

type TranscodeToHLSInput struct {
	SourceURL       string // The URL of the video to transcode to HLS
	SegmentDuration int    // The duration of each segment in seconds
	ListSize        int    // The maximum number of playlist entries
	OutputPath      string // The path to save the HLS playlist and segments
}

func TranscodeToHLSActivity(ctx context.Context, input *TranscodeToHLSInput) error {
	// Define the command and arguments to execute ffmpeg
	cmd := exec.Command(
		"ffmpeg",
		"-i",
		input.SourceURL,
		"-c:v",
		"libx264",
		"-c:a",
		"aac",
		"-f",
		"hls",
		"-hls_time",
		strconv.Itoa(input.SegmentDuration),
		input.OutputPath,
	)

	// Create a LogWriter instance
	logWriter := &LogWriter{}

	// Set the output of the ffmpeg command to use LogWriter
	cmd.Stdout = io.MultiWriter(logWriter)
	cmd.Stderr = io.MultiWriter(logWriter)

	// Run the command
	err := cmd.Run()
	if err != nil {
		log.Printf("Error running ffmpeg command: %v\n", err)
		return err
	}

	log.Println("task completed successfully")
	// If the command executed successfully, return nil
	return nil
}

Then in your workflow, the task (activity in temporal) would be called like this:

transcodeToHLSInputParams := &TranscodeToHLSInput{
		SourceURL:       "https://test-videos.co.uk/vids/bigbuckbunny/mp4/h264/360/Big_Buck_Bunny_360_10s_1MB.mp4",
		SegmentDuration: 10,
		OutputPath:      "/tmp/workflowtask.m3u8",
	}
err := workflow.ExecuteActivity(workflow.Context, TranscodeToHLSActivity, transcodeToHLSInputParams).Get(workflow.Context, &result)

And that’s it! You now have tasks being triggered reliably in the correct order. You can combine tasks in any order, you just have to modify the workflow function. Using this workflow orchestration for video processing will greatly improve your developer experience, as you can add more tasks while being confident in the way they are executed. If you find this approach challenging or prefer a hassle-free setup, you can explore the solution we developed at Wavestream Labs, a powerful and user-friendly video API that takes care of your workflow orchestration needs with ease. Thanks for reading and feel free to reach out if you have any questions and feedback.