Data Plumbing With Luigi
Updated: Mar 19, 2019
I didn't have brothers and I'm not the youngest sibling, so this may shock you, but Super Mario Brothers?! Please, Luigi is the man. He's taller, he's slimmer and he has a cooler mustache. Also, he shares his name with one of the most useful Python packages I've ever had the good fortune to discover.
The luigi package was written by some very clever folks at Spotify and as a data scientist I honestly cannot imagine my professional life without it. It's advertised as a pipelining tool, but it's really so much more than that. One of the pain points I've experienced time and again in this profession is collecting the various tasks that we have to perform as data scientists and assembling them into some sort of structured, repeatable code base with a common skeleton. This module is designed for specifically that task. Before I get into how, let me back up for some context.
A few years ago I was leading a small team of engineers and analysts at a hedge fund. This was before the days of Bloomberg and single-source, integrated datasets. We were predominantly responsible for building out exactly that - a consolidated, integrated database that could be used by the traders to research and test their investment strategies. Unfortunately, we had no common set of standards or code base and everyone was sort of doing their own thing. This made it very difficult to coordinate workflows and build anything in a consistent way. One of the more brilliant members of our group decided to build a common DAG-based backbone for all our processes that could tie everything together - the Quant Engine I think he called it, but my memory is a little fuzzy. There were a set of abstract classes (units) that each of our processes could inherit from, we coded our dependencies and then we executed our jobs. He even built a bunch of infrastructure around it to enable submission of our jobs in a parallelizable way to a high performance compute cluster. And all of this in MATLAB - yuck! Did I mention he was quite brilliant?! Anyways, it took some time, but once everyone learned how to use the system and bought into its benefits, we never looked back. So when I inevitably moved on to a new employer, I searched high and low for a Python alternative. Luigi was the closest I got.
Luigi advertises itself as a module that helps you build complex pipelines of batch jobs, while taking care of workflow management, parallelism and visualization. It's much more than this and it forms the backbone of a common repository of code that our small group of data scientists uses and contributes to daily. It allows us to modularize our tasks into dependency graphs without having to worry too much about parallelizing our workflow - we just design our tasks appropriately and luigi takes care of the rest. We've actually bootstrapped model and API build and deployment pipelines using luigi as a framework, which has helped us streamline deployment of our models into production. Again, once everyone was speaking a common language and came to terms with the slightly different way of thinking about writing code, nobody has looked back.
Let's dive into a simple example. Fundamentally, each luigi task has a set of inputs, a set of outputs, and a bunch of "work" that need to be done. Conceptually your inputs and outputs might be data files, command line arguments, database tables or some other tangible thing. For the purposes of our example, we'll use data files. We also need to define the work that needs to be done by our tasks. The package makes defining these tasks and stringing them all together in a workflow quite easy and elegant. Note that all code for the following example is available here. The file you want is titled luigi_demo.
This may seem counter-intuitive, but let's start with the outputs. Implementing a luigi task class requires you to override the output() method. This defines where to write the output of this task. It can take many forms, such as flat file, database table or some other format. For our purposes we'll use a flat file, denoted by wrapping the path in a luigi.LocalTarget() instance. The next step is to implement our run() method. This defines the work that the task should do. In our case, we just want to generate a random pandas Dataframe and write it to the output file. We'll include a pause so we can see our task running. Writing to the output file is important as it's existence signals to luigi that this task is complete. This behavior introduces one of luigi's other benefits - fault tolerance. If you try to execute the same workflow again, and the outputs of a given task exist, luigi will not try and re-run the task. This is awesome for long-running tasks that may break - luigi will simply pick up where it left off on the next run. Let's implement our run() method:
Note the call to output() to get the path to write the output to. This is actually all we need for a task that has no upstream dependencies. We can build a task graph containing this task and execute it like so.
Only after calling luigi.build() does anything execute. The arguments are reasonably straightforward. You pass your list of tasks, the number of workers you want luigi to use for parallelizing your job, and whether to use the local scheduler or not. The module ships with a central scheduler which takes care of job visualization and coordination, but for now, let's just use the local_scheduler option without that overhead.
This is well and good, but all we've done so far is execute a single task with no dependencies. How do we define these dependencies? And how do we feed data from task to task? The answer? Overriding the requires() method of luigi.Task.
In order to define dependencies, we need to implement the requires() method. This method returns a list of Task objects that this task requires in order to be run. It's an elegant way of defining self-contained dependencies without having to manage a giant external XML or Python file, which can be a maintenance chore for large jobs with hundreds of tasks. Let's create a second task, dependent on the first, and string it all together in a job workflow.
There's a few things going on here:
We've added a parameter, constant, to the MyDependentTask constructor. This allows us to pass a value to the task constructor when the object is instantiated. In this case, it's an IntParameter. Luigi supports many kinds of parameters, as well as the generic Parameter object. We've constructed the task object by passing the value 5.
We've added a dependency on MyFirstTask by implementing the requires() method. Requires can return either a single task, a list of tasks, or a dictionary where each of the values are tasks. I use dictionaries as I find it easier to keep track of named dependencies in subsequent tasks. This means that MyDependentTask will only run once MyFirstTask has completed.
We've referenced the dependency by calling the input() method. This is a wrapper function which lets us access the output of the tasks listed in the requires() method. As we're working with luigi.LocalTarget objects, we need to get the path property to access the appropriate location on the file system for our input.
We've implemented the run() method to read in the output of the previous task and perform a matrix addition, incorporating the constant parameter, and saving the output.
We've added the instance of MyDependentTask to our graph for execution. Note that because of the automatic dependency resolution, you really only have to pass the final task to the luigi.build() method - luigi will resolve the upstream dependency chain for you and execute everything that needs to be executed.
So if we execute the above code, the output we'd expect to see looks something like this:
Congratulations! You've just built and executed your first luigi job!
With a bit more practice you'll be building complex workflows and automating away a lot of the headache of repeatable tasks. Best of luck with your future data-plumbing endeavors!
Cover Image: Nintendo (obviously)