PiecePipe – Pipe dreams in Ruby

Shawn Anderson and I recently released a Ruby Gem called PiecePipe, a library that facilitates what we’re loosely referring to as “pipeline-style programming”, which means we’re expressing (and solving) some of our complex problems in terms of a flow of data through a series of separately-implemented steps.

  • gem install piece_pipe

Our goals:

  • Maximize expressiveness, at a high level, of complex algorithms
  • Focus on interesting, per-item calculations
  • Decouple calculations from details of iteration

For example, you should be able to discern the intent of the following code, even without understanding PiecePipe, the implementation of the steps, or the problem domain:


PiecePipe::Pipeline.new.
  source([{region: region}]).
  step(FetchPowerPlantsByRegion).
  step(FindWorstReactor).
  step(DetermineStatusClass).
  step(BuildPlantHealthSummary).
  step(SortByRadiationLevelsDescending).
  collect(:plant_health_summary).
  to_enum

(To see the actual step implementations, such as FindWorstReactor, visit the full nuke plant example.).

In this case, we’re adding steps by referring to classes named for their primary and only purpose. These are subclasses of PiecePipe::Step, whose instances will have their respective #process methods called on them. (Actually, in this example, most of the steps are AssemblyStep subclasses overriding #receive… more on that later).

The pipeline is bootstrapped with a source that responds to .to_enum. The example uses an array with a single Hash in it containing the region parameter. Then things start to get interesting…

Each Step is responsible for describing how to process a given object as it flows through the pipeline, by doing something with the input and producing zero or more results by calling the #produce method. Because #produce may be invoked any number of times, a Step may transform one object into another (by invoking #produce exactly once for each input), expand the object into many sequential results (eg, by calling #produce for each item in a collection owned by the input object) or filter objects out of the pipeline entirely (by refusing to invoke #produce for certain inputs).

These capabilities aren’t much different than #map, #flat_map, #reject and #select, all available via Ruby’s Enumerable module, except that there is no assumption that we’re processing an enumeration or sequence that supports such operations. Nor are any of the built-in Enumerable functions capable of lazy evaluation.

A transforming Step might look like:


# Given an id, load and produce a Person instance
class LoadPersonById < PiecePipe::Step
  def process(person_id)
    produce Person.find(id)
  end
end

A filtering (contractive) Step might take the form:


# Do not proceed with the processing of a Person unless they're local
class KeepLocals < PiecePipe::Step
  def process(person)
    produce person if person.lives_in_our_neighborhood?
  end
end

An iterating (expansive) Step might be implemented as:


# For any given Person, continue processing that Person plus their family members
class IncludeFamilyMembers < PiecePipe::Step
  def process(person)
    produce person
    person.family_members.each do |fam_member|
      produce fam_member
    end
  end
end

A common problem we encountered as we started breaking our code into pipelines was: a lot of Steps need information available much earlier in the pipeline, but which the intervening Steps have no need to know about. And accepting and passing extraneous values into and out of those intervening steps would defeat our goals of isolated responsibility, decoupling and simplicity.

Enter PiecePipe::AssemblyStep. AssemblySteps assume that their input objects are Hashes, and that they will be producing partial results by adding keys to that Hash. AssemblySteps receive input via #receive and produce output via #install(new_info={}). Metaphorically, we're imitating assembly line production where each station makes an addition or modification to an increasingly complex object as it proceeds through a factory, including scaffolding and annotations that appear along the way, inform the intermediate tasks, and are remove before the final product rolls off the line.


# For any given power plant, determine the worst reactor. 
# Implemented as an AssemblyStep that analyzes inputs[:power_plant] from the prior Step,
# and installs a new key/val pair for :worst_reactor.  
class FindWorstReactor < PiecePipe::AssemblyStep
  def receive(inputs)
    # Figure out which reactor has the highest radiation levels.
    # "install" works a lot like "produce", but rather than take responsibility for the totality
    # of the produced object, we're just saying "add :worst_reactor to whatever's there and pass it on".
    install worst_reactor: inputs[:power_plant].reactors.reject(&:offline?).max_by(:&radiation)
  end
end

AssemblySteps free us up to write code that focuses only on converting relevant input to simple output, without taking responsibility for the overall state of an object as it passes through the pipeline.

(Observation: as AssemblySteps begin to stack up, we introduce a high degree of connascence-of-name between the individual step definitions... early steps must produce output keys that meet the expectations of various downstream steps. This reduces the individual reusability of each step, and we're pondering some techniques to reduce this coupling.)

The SortByRadiationLevelsDescending step is interesting in that it overrides #generate_sequence instead of #process. This Step needs to collect all incoming items and sort them before producing the sorted sequence one item at a time. (At some point we hope to encapsulate this a little better; right now you need to know some of PiecePipe's secrets to create steps like this.)

In our final processing step, we extract the "production" parts from our assembly line via Pipeline#collect(:plant_health_summary), which is a built-in Step, paramaterized with the key you'd like to extract. In this case, the sequence of objects flowing through our pipeline--namely, Hashes with a number of keys--will transform into a series of PlantHealthSummary instances as produced by the BuildPlantHealthSummary step.

We call #to_enum to produce a sequence that can be consumed by the outside world. (Btw, not a single step will execute in the pipeline until someone accesses elements in this enumeration. Lazy eval is good.)

Some parting notes:

Conversation
  • […] in the room could think of any Ruby gem that could act as a pipeline. I may have found one by Atomic Objects in Michigan. It’s called Piece Pipe. I have not looked at it too closely yet (I am writing […]

  • […] in June, Dave wrote a blog post about the PiecePipe gem, but he left out the usage of MapStep and […]

  • Comments are closed.