Ville Lautanala

Streaming Data with Ruby Enumerators

By Ville Lautanala on July 6, 2017

Streaming is an efficient method of handling large collections of data. Working with streaming data in Ruby using blocks is clunky compared to Node.js Stream API where streams can be easily composed. In this blog post we share how we combined the ideas from Node.js Streams with Ruby enumerables to build composable streams in Ruby. This has helped us scale our feed processing to a whopping pace of over 1 million products processed each minute. 

A key aspect of our product is managing our customers’ product data. We fetch a feed from their server, convert it to Facebook format, enhance it with image templates, and upload to S3 where Facebook then fetches the feed file from.

The feeds can be huge: a large ecommerce platform may have millions of items in their catalog. Facebook uses this catalog information to dynamically generate ads that are suitable for specific users. This is why our customers typically want to upload their full catalog to Facebook. 

The feed processing is quite easy to explain visually:

Smartly_Stream-01.jpg

Reading the feed file over HTTP as a stream of bytes and uploading the end result to S3 before we’ve even finished downloading the file makes the process efficient. We don’t have to store a multi-gigabyte temporary file on disk or load the whole feed to memory to be able to process it. This also makes the process faster we can start work on the file content before the last byte has been received.

Our design goals for the feed processing were:

  1. Code structure should have the same structure as block diagram
  2. Allow streaming
  3. Composable: use same building blocks to show instant preview data from the origin XML/JSON/CSV as the feed parsing in background jobs

 

Using blocks for streaming

It’s straightforward to implement a streaming HTTP using Ruby blocks. Let’s see what a solution for HTTP and Gzip could look like implemented with Ruby blocks:

decoder = EM::HttpDecoder::Gzip.new do |decompressed|
  # Forward to XML parser
end
 
request = Typhoeus::Request.new("http://www.example.com/feed.xml")
request.on_body |chunk|
  decoder << chunk
end
request.run
decoder.finish!

Is this hard to read? No. Would I like to extend this solution with more similar blocks? No, having multiple such blocks depending on each other would quickly become a tangled mess.

To accommodate more complexity, we need to extract HTTP request, GZip decoding and  XML parsing into separate entities. For example, we might need to also do character set conversion from Latin-1 to UTF-8 before passing the data to XML parser.

 

Ruby Enumerator

Ruby has lazy enumerables, which are stream-like objects. There’s just one caveat: the built-in Enumerable interface does not include any mechanisms to have functionality before or after iterating over the lazy collection. 

In addition to Enumerable module, Ruby also has a class Enumerator. With Enumerator, you are in full control of what is yielded to the listener and when. This allows having the initialization and cleanup while still having composable representations of different stages of the process.

The streaming HTTP request can be implemented as:

module HTTPFetch
  def self.call(url)
    uri = URI(url)
    Enumerator.new do |yielder|
      Net::HTTP.start(uri.host, uri.port, use_ssl: uri.scheme == 'https') do |http|
        request = Net::HTTP::Get.new(uri.request_uri)
        http.request request do |response|
          response.read_body { |chunk| yielder << chunk }
        end
      end
    end
  end
end

Gzip decoding implemented with the same pattern would become: 

module GZipDecode
  def self.call(enumerable)
    Enumerator.new do |yielder|
      decoder = EM::HttpDecoders::Gzip.new do |chunk|
        yielder << chunk
      end
    		
      enumerable.each { |chunk| decoder << chunk }
      decoder.finalize
    end
  end
end

Implementing similar functionality for character set conversion or parsing XML isn’t any harder – assuming they can take in data in small chunks.

 

Composing Enumerators with Piperator

The callable modules shown before can be combined quite easily:

GZipDecode.call(HTTPFetch.call)

In that code we have decompression before HTTP. That doesn’t match the data flow.

The model of processing feeds as shown in the figure could be represented as code as:

Pipeline
  .pipe(HTTPFetch)
  .pipe(GZipDecode)

We’ve implemented this functionality in a gem called Piperator. The gist of the Pipeline implementation in Piperator is only a little glue code:

class Piperator::Pipeline
  def self.pipe(callable)
    Pipeline.new([callable])
  end
 
  def initialize(pipes = [])
    @pipes = pipes
  end
 
  def call(enumerable = [])
    @pipes.reduce(enumerable) { |pipe, memo| memo.call(pipe) }
  end
  
  def pipe(other)
    Pipeline.new(@pipes + [other])
  end
end

That enables the ideal code from before:

result = Piperator::Pipeline
  .pipe(HTTPFetch)
  .pipe(GZipDecode)
  .call(url)

The result is a lazy enumerable. To get any values out of the collection, it must be iterated over. For example, result.each(&:display) would print each chunk.

An interesting property of this approach is that the Pipeline itself uses the same callable interface as the pipe blocks. They can be added to a pipeline as parts just like regular callable objects.

Let’s use Piperator to compute the uncompressed length of a GZip file over HTTP without storing the content to disk or memory.


length = proc do |enumerable|
  enumerable.lazy.sum(&:length)
end
 
Piperator::Pipeline
  .pipe(HTTPFetch)
  .pipe(GZipDecoder)
  .pipe(length)
  .call('http://ftp.gnu.org/gnu/gzip/gzip-1.2.4.tar.gz')

 

Enumerators as I/O objects

The examples above have only used the pipeline objects in situations where everything is built for streaming from the ground up. When processing XML with Nokogiri – the most popular XML parser for Ruby – this unfortunately is not the case.

Nokogiri can do streaming, but expects the input to be an IO object. If you first store the input to a temporary file before streaming this isn’t a problem, but that extra step will increase the time spent processing the feed

To enable streaming over HTTP, we've also implemented a pseudo IO object for Enumerators. It doesn't implement all of the Ruby IO functionality, but enough for at least Nokogiri and Oj to work.

A simple XMLParser using this pattern could look like this:

module XMLParser
  def self.call(enumerable)
    io = Piperator::IO.new(enumerator)
    reader = Nokogiri::XML::Reader(io)
    reader.each do |node|
      yielder << build_product(node) if product?(node)
    end
  end
end

This pattern would still need an implementation to detect and parse the product items from the stream of nodes. We're using XPath-like selectors for detecting, which also means that the parser needs to keep track of the XML node hierarchy. For parsing, nodes are converted to Nokogiri::XML::Documents. It isn’t efficient, but we can do full XPath if needed for parsing particular client's XML structure.



Assembling a feed processing pipeline

As a last step, we upload the feed to a S3 bucket. S3 allows uploads in chunks of at least 5 megabytes. A separate pipe block is responsible of the buffering and creating the uploads to S3. This is left to implement as an exercise for the reader.

The end result is as desired:

Piperator::Pipeline
  .pipe(HTTPFetch)
  .pipe(GZipDecode)
  .pipe(XMLParse)
  .pipe(S3Upload)
  .call("https://example.com/feed.xml")
# => return e.g. s3 file id

We’ve used the Pipeline pattern in Smartly feed processing successfully. Even after extending the processing to contain 17 distinct steps responsible for adding image templates, logging, error handling, etc., the code is still readable. The end result incorporates for our purposes the most useful parts from Node.js streams, reactive programming and Elixir pipe operator.

Would you like to tackle challenges of huge scale? See what it’s like to be an engineer at Smartly.io at smartly.io/developer

Back to blog