Connection management and SQL parallelisation for R analytics on big database clusters
Querying PostgreSQL from R is typically done using the RPostgreSQL driver (or the newer
RPostgres). However in both cases it’s the responsibility of the analyst to maintain the connection objects
and pass them on every query. In analytical contexts, where data resides in multiple databases (e.g. sharded setup,
microservice setup, etc.) the task of maitaining all connection objects, quickly becomes very tedious.
Furthermore, in many partitioning and sharding data architectures, queries could be parallized and run simultaneously to
get the necessary data more efficiently. However parallelizing the querying could mean even more complexity for the analyst.
rport
solves both of these issues by:
database.yml
config)Rport is distributed as a lightweight R package and you can get the most up-to-date version
from GitHub, directly from within an R session:
> library(devtools); install_github('adjust/rport')
Next you’ll have to define some PostgreSQL connection settings in YML format
by default here config/database.yml
. See the example
database.yml
for an example.
Given that you have a connection name db1
(and a running PostgreSQL database),
you can test by:
library(rport)
db('db1', 'select 1')
If successful, you should see the following output:
> db('db1', 'select 1')
2018-04-10 17:09:04 -- 1468 Executing: select 1 on db1
2018-04-10 17:09:05 -- 1468 Done: db1
?column?
1: 1
Managing the PostgreSQL connectivity in an analytics environment with even a
single database can already be very beneficial. This usecase is popular and
encouraged. The full benefit of rport
is however unlocked in contexts where
data is partitioned/sharded.
Below are some of the usecases for rport
which emphasize the benefits it
offers in handling DB connection objects and distributing SQL queries.
Suppose we have 16 database nodes (shards), where data is distributed by some
key. Below is a sample config/database.yml
, which we might have on our
workspace defining all connection settings.
shard1:
database: db1
username: postgres
port: 5432
application_name: rport
shard2:
database: db2
username: postgres
port: 5432
application_name: rport
...
shard16:
database: db16
username: postgres
port: 5432
application_name: rport
Let’s say we want to run the following SQL query on every shard and combine the
results for analysis in R.
SELECT id, name, city, sum(events) as events
FROM events
WHERE country IN ('de', 'fr', 'bg')
To distribute this SQL on all 16 database servers (shards), we can use the
following R code.
library(rport)
sql <- "
SELECT id, name, city, sum(events) as events
FROM events
WHERE country IN ('de', 'fr', 'bg')
"
# Perform intermediate (per-shard) aggregation, parallel on 4 cores by default.
events <- db(paste0('shard', 1:16), sql)
# Perform final (in-memory) aggregation on the resulting `data.table`
events <- events[, .(events=sum(events)), by='country']
One of our product’s database model has data partitioned over several thousand
of PostgreSQL tables. All tables have the same schema so often we want to do
analytics on data from many of these tables. See the example data model below
where each app’s data is stored on its own table.
create table app_1 (id int, title text, created_at date, installs int,...);
create table app_2 (id int, title text, created_at date, installs int,...);
...
create table app_100 (id int, title text, created_at date, installs int,...);
To distribute a query on all apps, using rport
in R you can do:
sql <- sprintf("
SELECT
id AS app_id,
created_at AS date,
sum(installs) installs
FROM app_%s
WHERE created_at > '2018-01-01'
GROUP BY 1, 2
", 1:100)
dat <- db('apps-db', sql)
Note that the sql
variable above is a vector of queries, each being different
from the others by the table name it reads from. db('apps-db', sql)
will
distribute those queries in parallel on the single PostgreSQL instance.
Scaling the usecase above, let’s model the raw data on the usage of apps. Each
user’s interaction with an app will be producing raws into our tables:
create table app_1_20180101 (device_id uuid, created_at timestamp, os_name text, os_version ...);
create table app_1_20180102 (device_id uuid, created_at timestamp, os_name text, os_version ...);
create table app_1_20180103 (device_id uuid, created_at timestamp, os_name text, os_version ...);
...
create table app_2_20180101 (device_id uuid, created_at timestamp, os_name text, os_version ...);
...
We’ll also put these tables into multiple PostgreSQL instances.
create database db1;
create database db2;
create database db3;
...
create database db50;
At adjust we actually query hundreds of PostgreSQL databases, where petabytes of
data live according to a similar partitioning scheme. We have a master
PostgreSQL node, which contains the meta-data determining, on which database
data is stored. Suppose the master instance manages the metadata in a table
like that:
CREATE TABLE metadata (
connection_name text,
app_id int,
created_at date
)
Let’s look at how we can run analytical queries using rport
in R on such
setup. We are interested in estimating the adoption rates of iOS versions and
the activity we see on each version over the last 6 months from our distributed
raw data.
library(rport)
# Get all DB connections containing data for the last 180 days.
metadata <- db('master', '
SELECT connection_name, app_id, created_at
FROM metadata
WHERE created_at > current_date - 180
')
# SQL query that we want to run on every relevant node.
sql.template <- "
SELECT os_veresion, created_at::date, count(*) AS events
FROM app_%d_%d
WHERE os_name = 'ios'
GROUP BY os_version
"
# data.table syntax to connection names and the relevant SQL
metadata[, sql:=sprintf(sql.template, app_id, created_at)]
dat <- db(metadata$connection_name, metadata$sql)
We expect that the database connections are defined at runtime in
database.yml
. This doesn’t have to be the case and at adjust we define these
connections dynamically using register.connections()
after reading
from the master node.
Shiny is a popular framework for interactive data visualisations in R.
Using the Pool project and rport
you can connect Shiny to either your
distributed cluster or simply to all different database you might have. Managing
DB configurations in a centralized file makes it much easier to deploy multiple
Shiny apps.
The main function that rport
provides is db()
and it’s exemplified in the
Usecases section below. Here’s an overview of the rest of rport
’s functions.
db.connection # retrieve a connection object from a connection name
db.disconnect # disconnect either all open connections or by connection name
list.connections # get a list of all open database connections
register.connections # register a list of new connections (other than those defined in `database.yml`)
reload.db.config # reload the `database.yml` connection config file
For more details on each of those functions, check their help from R - for
example ?db.connection
.
The R DBI interface already supports a function called dbWriteTable
, which the
Postgres driver implements using SQL COPY
. However the implementation isn’t
flexible enough and among other shortages it:
COPY
and thus you can’t benefit fromCOPY
into temp tables (e.g. CREATE TABLE my_temp_table (...) ON COMMIT DROP
)rport
has a stripped down implementation which gives you the flexibility to
address these. Example usage:
# Suppose we have a table called my_pg_table
dat <- data.table(ts=as.POSIXct('2013-01-01 00:00:10'), id=1:10, bl=TRUE)
con <- db.connection('db1')
tbl.name <- 'my_pg_table'
pg.copy(con, tbl.name, dat)
rport
allows some configuration through the R’s options()
functionality.
By default rport
looks for a config/database.yml
file. Custom database.yml
location could be given in two ways:
options('rport-database-yml-file'='~/my-dir/my-config.yml')
RPORT_DB_CONFIG=~/my-dir/my-config.yml
rport
logs SQL statements on db()
call. By default only the first 100
characters are logged. This length could be changed by:
options('rport-max-sql-query-log-length'=111)
The development of Rport has been driven by the internal needs at Adjust, which is a PostgreSQL company. However
abstracting the RDBMS backend is easily achievable and could be done at future iterations on the project. Contributions
are also welcome.
The concept of database.yml
connection definitions have been borrowed from the Ruby on Rails
world. For the time
being this will stay part of rport
, but we might in the future offer support for other configuration formats and even
make the YML dependency obsolete.
The idea of a framework for analytics apps is not dead for us. In a possible future development of such framework,
rport
would definitely be a part of it. However, we chose to focus the project on addressing our growing analytics
needs and we realized we were mainly using the DB connectivity feature of rport
, so we further developed that. Caching
was one example where we found that the memoise
package was exactly what we needed for the purpose and so there was no
use of us duplicating the functionality in rport
.
RPostgres
driver for PostgreSQL.We follow the development of the RPostgres project closely and we might switch to it as a supported
PostgreSQL driver in the future.
To run the test suite of rport
you’ll need Docker. Check the
project’s Makefile to find your way in the test suite. Build your feature and
send a Pull Request on GitHub. Or just write an issue first.
Nikola Chochkov [email protected], Berlin, adjust GmbH, Germany
This Software is licensed under the MIT License.
Copyright © 2018 adjust GmbH, http://www.adjust.com
Permission is hereby granted, free of charge, to any person obtaining a copy of
this software and associated documentation files (the “Software”), to deal in
the Software without restriction, including without limitation the rights to
use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of
the Software, and to permit persons to whom the Software is furnished to do so,
subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS
FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR
COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER
IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.