Data processing and ETL toolkit for Ruby
metacrunch is a simple and lightweight data processing and ETL (Extract-Transform-Load)
toolkit for Ruby.
$ gem install metacrunch
Note: When upgrading from metacrunch 3.x, there are some breaking changes you need to address. See the notes below for details.
The basic idea behind an ETL job in metacrunch is the concept of a data processing pipeline. Each ETL job reads data from a source (extract step), runs one or more transformations (transform step) on the data and finally loads the transformed data to a destination (load step).
metacrunch gives you a simple DSL (Domain-specific language) to define and run ETL jobs in Ruby. Just create a text file with the extension .metacrunch
and run it with the provided metacrunch
CLI command. Note: The file extension doesn’t really matter but you should avoid .rb
to not loading them by mistake from another Ruby component.
Let’s walk through the main steps of creating ETL jobs with metacrunch. For a collection of working examples check out our metacrunch-demo repository.
Every .metacrunch
job is a regular Ruby file and you can use any valid Ruby code like declaring methods, classes, variables, requiring other Ruby
files and so on.
# File: my_etl_job.metacrunch
def my_helper
# ...
end
class MyHelper
# ...
end
helper = MyHelper.new
require "SomeGem"
require_relative "./some/other/ruby/file"
A source is an object that emits data objects (e.g. from a file or an external system) into the metacrunch processing pipeline. Implementing sources is easy – a source is a Ruby Enumerable
(any object that responds to the #each
method). For more information on how to implement sources see notes below.
You must declare a source to allow a job to run.
A source iterates over it’s entries and emits every entry as a data object into the transformation pipeline, by passing it to the first transformation.
# File: my_etl_job.metacrunch
source [1,2,3,4]
# or ...
source Metacrunch::File::Source.new(ARGV)
# or ...
source MySource.new
To process, transform or manipulate data use the #transformation
hook. A transformation is implemented with a callable
object (any Ruby object that responds to #call
. E.g. a Proc
). To learn more about transformations check the section about implementing transformations below.
The current data object (the current object emitted by the source) will be passed to the first transformation as a parameter. The return value of a transformation will then be passed to the next transformation and so on.
There are two exceptions to that rule:
nil
the current data object will be dismissed and the next transformation won’t be called. The process continues with the next data object that will be emitted by the source and the first transformation.Enumerator
the object will be expanded and the following transformations will be called with each element of the Enumerator
.# File: my_etl_job.metacrunch
# Array implements #each and therefore is a valid source
source [1,2,3,4,5,6,7,8,9]
# A transformation is implemented with a `callable` object (any
# object that responds to #call).
# Proc responds to #call
transformation ->(number) {
# Called for each data object that has been emitted by a source.
# You must return the data to keep it in the pipeline. Dismiss the
# data conditionally by returning nil.
number if number.odd?
}
# Only called for odd numbers as even numbers gets dismissed in the previous
# transformation.
transformation ->(odd_number) {
odd_number * 2
}
# MyTransformation implements #call. Gets called with the prevous number times 2.
transformation MyTransformation.new
Sometimes it is useful to buffer data between transformation steps to allow a transformation to work on larger bulks of data. metacrunch uses a simple transformation buffer to achieve this.
To use a transformation buffer add the :buffer
option to your transformation. You can pass a positive integer value as a buffer size, or as an advanced option you can pass a Proc
object. The buffer flushes every time the buffer reaches the given size or if the Proc
returns true
. The buffer also flushes after the last data object was emitted by the source.
# File: my_etl_job.metacrunch
source 1..95 # A range responds to #each and is a valid source
# A buffer with a fixed size
transformation ->(bulk) {
# this transformation is called when the buffer
# is filled with 10 objects or if the source has
# yielded the last data object.
# bulk would be: [1,...,10], [11,...,20], ..., [91,...,95]
}, buffer: 10
# A buffer that uses a Proc
transformation ->(bulk) {
# Called when the buffer `Proc` returns `true`
}, buffer: -> {
true if some_condition
}
A destination is an object that writes the transformed data to an external system (e.g. a file, database etc.). Implementing destinations is easy – see notes below. A destination receives the return value from the last transformation as a parameter if the return value from the last transformation was not nil
.
# File: my_etl_job.metacrunch
destination MyDestination.new
To run arbitrary code before the first transformation is run on the first data object use the #pre_process
hook. To run arbitrary code after the last transformation is run on the last data object use #post_process
. Like transformations, #post_process
and #pre_process
must be implemented using a callable
object.
pre_process -> {
# Proc responds to #call
}
# MyCallable class defines #call
post_process MyCallable.new
metacrunch has build-in support to parameterize jobs. Using the options
hook you can declare options that can be set/overridden by the CLI when running your jobs.
# File: my_etl_job.metacrunch
options do
add :log_level, "-l", "--log-level LEVEL", "Log level (debug,info,warn,error)", default: "info"
add :database_url, "-d", "--database URL", "Database connection URL", required: true
end
# Prints out 'info'
echo options[:log_level]
In this example we declare two options log_level
and database_url
. log_level
defaults to info
, whereas database_url
has no default and is required. In your job file you can access the option values using the options
Hash. E.g. options[:log_level]
.
To set/override these options use the command line.
$ metacrunch my_etl_job.metacrunch --log-level debug
This will set the options[:log_level]
to debug
.
To get a list of available options for a job, use --help
on the command line.
$ metacrunch my_etl_job.metacrunch --help
Job options:
-l, --log-level LEVEL Log level (debug,info,warn,error)
DEFAULT: info
-d, --database URL Database connection URL
REQUIRED
All non-option arguments that get passed to the job when running are available to the ARGV
constant. If your job requires such arguments (e.g. if you work with a list of files) you can require it.
# File: my_etl_job.metacrunch
options(require_args: true) do
# ...
end
metacrunch comes with a handy command line tool. In a terminal use
$ metacrunch my_etl_job.metacrunch
to run a job.
If you use Bundler to manage dependencies for your jobs make sure to change into the directory where your Gemfile is (or set BUNDLE_GEMFILE environment variable) and run metacrunch with bundle exec
.
$ bundle exec metacrunch my_etl_job.metacrunch
In your job file use Bundler.require
to require the dependencies from your Gemfile
.
# File: my_etl_job.metacrunch
Bundler.require
Use the following syntax to run a metacrunch job
$ [bundle exec] metacrunch [options] JOB_FILE [job-options] [ARGS...]
A metacrunch source is any Ruby Enumerable
object (an object that responds to the #each
method) that yields data objects one by one.
The data is usually a Hash
instance, but could be other structures as long as the rest of your pipeline is expecting it.
Any Enumerable
object (e.g. Array
) responds to #each
and can be used as a source in metacrunch.
# File: my_etl_job.metacrunch
source [1,2,3,4,5,6,7,8,9]
Usually you implement your sources as classes. Doing so you can unit test and reuse them.
Here is a simple CSV source
# File: my_csv_source.rb
require 'csv'
class MyCsvSource
def initialize(input_file)
@csv = CSV.open(input_file, headers: true, header_converters: :symbol)
end
def each
@csv.each do |data|
yield(data.to_hash)
end
@csv.close
end
end
You can then use that source in your job
# File: my_etl_job.metacrunch
require "my_csv_source"
source MyCsvSource.new("my_data.csv")
A metacrunch transformation is implemented as a callable
object. A callable
in Ruby is any object that responds to the #call
method.
Proc
s in Ruby respond to #call
. They can be used to implement transformations inline.
# File: my_etl_job.metacrunch
transformation -> (data) do
# ...
end
Like sources you can create classes to test and reuse transformation logic.
# File: my_transformation.rb
class MyTransformation
def call(data)
# ...
end
end
You can use this transformation in your job
# File: my_etl_job.metacrunch
require "my_transformation"
transformation MyTransformation.new
A destination is any Ruby object that responds to #write(data)
and #close
.
Like sources you are encouraged to implement destinations as classes.
# File: my_destination.rb
class MyDestination
def write(data)
# Write data to files, remote services, databases etc.
end
def close
# Use this method to close connections, files etc.
end
end
In your job
# File: my_etl_job.metacrunch
require "my_destination"
destination MyDestination.new
When upgrading from metacrunch 3.x, there are some breaking changes you need to address.
source
and destination
. If you have more than one in your job file the last definition will used.transformation_buffer
anymore. Instead set buffer
as an option to transformation
.transformation
, pre_process
and post_process
can’t be implemented using a block anymore. Always use a callable
(E.g. Lambda, Proc or any object responding to #call
).@@
.args
function used to get the non-option arguments passed to a job has been removed. Use ARGV
instead.Metacrunch::Db
classes have been moved into the metacrunch-db gem package.Metacrunch::Redis
classes have been moved into the metacrunch-redis gem package.Metacrunch::File
classes have been moved into the metacrunch-file gem package.metacrunch is available at github under MIT license.