Postgresql R2DBC Driver
This project contains the PostgreSQL implementation of the R2DBC SPI. This implementation is not intended to be used directly, but rather to be used as the backing implementation for a humane client library to delegate to.
This driver provides the following features:
Supplier<String>
or Publisher<String>
REFCURSOR
using io.r2dbc.postgresql.api.RefCursor
Codec
s to handle additional PostgreSQL data typesNext steps:
This project is governed by the Code of Conduct. By participating, you are expected to uphold this code of conduct. Please report unacceptable behavior to [email protected].
Here is a quick teaser of how to use R2DBC PostgreSQL in Java:
URL Connection Factory Discovery
ConnectionFactory connectionFactory = ConnectionFactories.get("r2dbc:postgresql://<host>:5432/<database>");
Publisher<? extends Connection> connectionPublisher = connectionFactory.create();
Programmatic Connection Factory Discovery
Map<String, String> options = new HashMap<>();
options.put("lock_timeout", "10s");
options.put("statement_timeout", "5m");
ConnectionFactory connectionFactory = ConnectionFactories.get(ConnectionFactoryOptions.builder()
.option(DRIVER, "postgresql")
.option(HOST, "...")
.option(PORT, 5432) // optional, defaults to 5432
.option(USER, "...")
.option(PASSWORD, "...")
.option(DATABASE, "...") // optional
.option(OPTIONS, options) // optional
.build());
Publisher<? extends Connection> connectionPublisher = connectionFactory.create();
// Alternative: Creating a Mono using Project Reactor
Mono<Connection> connectionMono = Mono.from(connectionFactory.create());
Supported ConnectionFactory Discovery Options
Option | Description |
---|---|
ssl |
Enables SSL usage (SSLMode.VERIFY_FULL ). |
driver |
Must be postgresql . |
protocol |
Protocol specifier. Empty to use single-host operations. Supported: failover for multi-server failover operations. (Optional) |
host |
Server hostname to connect to. May contain a comma-separated list of hosts with ports when using the failover protocol. |
port |
Server port to connect to. Defaults to 5432 . (Optional) |
socket |
Unix Domain Socket path to connect to as alternative to TCP. (Optional) |
username |
Login username. Can be a plain String , Supplier<String> , or Publisher<String> . |
password |
Login password. Can be a plain CharSequence , Supplier<CharSequence> , or Publisher<CharSequence> . (Optional when using TLS Certificate authentication) |
database |
Database to select. (Optional) |
applicationName |
The name of the application connecting to the database. Defaults to r2dbc-postgresql . (Optional) |
autodetectExtensions |
Whether to auto-detect and register Extension s from the class path. Defaults to true . (Optional) |
compatibilityMode |
Enable compatibility mode for cursored fetching. Required when using newer pgpool versions. Defaults to false . (Optional) |
errorResponseLogLevel |
Log level for error responses. Any of OFF , DEBUG , INFO , WARN or ERROR Defaults to DEBUG . (Optional) |
extensions |
Collection of Extension to provide additional extensions when creating a connection factory. Defaults to empty. (Optional) |
fetchSize |
The default number of rows to return when fetching results. Defaults to 0 for unlimited. (Optional) |
forceBinary |
Whether to force binary transfer. Defaults to false . (Optional) |
hostRecheckTime |
Host status recheck time when using multi-server operations. Defaults to 10 seconds . (Optional) |
loadBalanceHosts |
Whether to shuffle the list of given hostnames before connect when using multi-server operations. Defaults to true . (Optional) |
loopResources |
TCP/Socket LoopResources (depends on the endpoint connection type). (Optional) |
lockWaitTimeout |
Lock wait timeout. (Optional) |
noticeLogLevel |
Log level for error responses. Any of OFF , DEBUG , INFO , WARN or ERROR Defaults to DEBUG . (Optional) |
preferAttachedBuffers |
Configure whether codecs should prefer attached data buffers. The default is false , meaning that codecs will copy data from the input buffer into a byte array. Enabling attached buffers requires consumption of values such as Json to avoid memory leaks. |
preparedStatementCacheQueries |
Determine the number of queries that are cached in each connection. The default is -1 , meaning there’s no limit. The value of 0 disables the cache. Any other value specifies the cache size. |
options |
A Map<String, String> of connection parameters. These are applied to each database connection created by the ConnectionFactory . Useful for setting generic PostgreSQL connection parameters. (Optional) |
schema |
The search path to set. (Optional) |
sslMode |
SSL mode to use, see SSLMode enum. Supported values: DISABLE , ALLOW , PREFER , REQUIRE , VERIFY_CA , VERIFY_FULL , TUNNEL . (Optional) |
sslRootCert |
Path to SSL CA certificate in PEM format. Can be also a resource path. (Optional) |
sslKey |
Path to SSL key for TLS authentication in PEM format. Can be also a resource path. (Optional) |
sslCert |
Path to SSL certificate for TLS authentication in PEM format. Can be also a resource path. (Optional) |
sslPassword |
Key password to decrypt SSL key. (Optional) |
sslHostnameVerifier |
javax.net.ssl.HostnameVerifier implementation. (Optional) |
sslSni |
Enable/disable SNI to send the configured host name during the SSL handshake. Defaults to true . (Optional) |
statementTimeout |
Statement timeout. (Optional) |
targetServerType |
Type of server to use when using multi-host operations. Supported values: ANY , PRIMARY , SECONDARY , PREFER_SECONDARY . Defaults to ANY . (Optional) |
tcpNoDelay |
Enable/disable TCP NoDelay. Enabled by default. (Optional) |
tcpKeepAlive |
Enable/disable TCP KeepAlive. Disabled by default. (Optional) |
timeZone |
Configure the session timezone to control conversion of local temporal representations. Defaults to TimeZone.getDefault() (Optional) |
Programmatic Configuration
Map<String, String> options = new HashMap<>();
options.put("lock_timeout", "10s");
PostgresqlConnectionFactory connectionFactory = new PostgresqlConnectionFactory(PostgresqlConnectionConfiguration.builder()
.host("...")
.port(5432) // optional, defaults to 5432
.username("...")
.password("...")
.database("...") // optional
.options(options) // optional
.build());
Mono<Connection> mono = connectionFactory.create();
PostgreSQL uses index parameters that are prefixed with $
. The following SQL statement makes use of parameters:
INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3)
Parameters are referenced using the same identifiers when binding these:
mono.flatMapMany(connection -> connection
.createStatement("INSERT INTO person (id, first_name, last_name) VALUES ($1, $2, $3)")
.bind("$1", 1)
.bind("$2", "Walter")
.bind("$3", "White")
.execute());
Binding also allowed positional index (zero-based) references. The parameter index is derived from the parameter discovery order when parsing the query.
Artifacts can be found on Maven Central.
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>${version}</version>
</dependency>
If you’d rather like the latest snapshots of the upcoming major version, use our Maven snapshot repository and declare the appropriate dependency version.
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>${version}.BUILD-SNAPSHOT</version>
</dependency>
<repository>
<id>sonatype-nexus-snapshots</id>
<name>Sonatype OSS Snapshot Repository</name>
<url>https://oss.sonatype.org/content/repositories/snapshots</url>
</repository>
To support simple connection fail-over it is possible to define multiple endpoints (host and port pairs) in the connection url separated by commas. The driver will try once to connect to each of them
in order until the connection succeeds. If none succeeds a normal connection exception is thrown. Make sure to specify the failover
protocol.
The syntax for the connection url is:
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3
For example an application can create two connection pools. One data source is for writes, another for reads. The write pool limits connections only to a primary node:
r2dbc:postgresql:failover://user:foo@host1:5433,host2:5432,host3?targetServerType=primary.
R2DBC Postgres supports both, the simple
and extended message flow.
Cursored fetching is activated by configuring a fetchSize
. Postgres cursors are valid for the duration of a transaction. R2DBC can use cursors in auto-commit mode (Execute
and Flush
) to not
require an explicit transaction (BEGIN…COMMIT/ROLLBACK
). Newer pgpool versions don’t support this feature. To work around this limitation, either use explicit transactions when configuring a fetch
size or enable compatibility mode. Compatibility mode avoids cursors in auto-commit mode (Execute
with no limit + Sync
). Cursors in a transaction use Execute
(with fetch size as limit) + Sync
as message flow.
Listen and Notify provide a simple form of signal or inter-process communication mechanism for processes accessing the same PostgreSQL database. For Listen/Notify, two actors are involved: The
sender (notify) and the receiver (listen). The following example uses two connections to illustrate how they work together:
PostgresqlConnection sender= …;
PostgresqlConnection receiver= …;
Flux<Notification> listen = receiver.createStatement("LISTEN mymessage")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.thenMany(receiver.getNotifications());
Mono<Void> notify=sender.createStatement("NOTIFY mymessage, 'Hello World'")
.execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.then();
Upon subscription, the first connection enters listen mode and publishes incoming Notification
s as Flux
. The second connection broadcasts a notification to the mymessage
channel upon
subscription.
Postgres supports additional options when starting a transaction. In particular, the following options can be specified:
isolationLevel
) (reset after the transaction to previous value)readOnly
)deferrable
)These options can be specified upon transaction begin to start the transaction and apply options in a single command roundtrip:
PostgresqlConnection connection= …;
connection.beginTransaction(PostgresTransactionDefinition.from(IsolationLevel.SERIALIZABLE).readOnly().notDeferrable());
See also: https://www.postgresql.org/docs/current/sql-begin.html
PostgreSQL supports JSON by storing values in JSON
/JSONB
columns. These values can be consumed and written using the regular R2DBC SPI and by using driver-specific extensions with
the io.r2dbc.postgresql.codec.Json
type.
You can choose from two approaches:
Json
wrapper type.The difference between the Json
type and scalar types is that Json
values are written encoded as JSONB
to the database.
byte[]
and String
types are represented as BYTEA
respective VARCHAR
and require casting ($1::JSON
) when used with parameterized statements.
The following code shows INSERT
and SELECT
cases for JSON interaction:
CREATE TABLE my_table (my_json JSON);
Write JSON
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1)")
.bind("$1", Json.of("{\"hello\": \"world\"}")).execute();
Consume JSON
connection.createStatement("SELECT my_json FROM my_table")
.execute()
.flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", Json.class)))
.map(Json::asString);
Write JSON using casting
connection.createStatement("INSERT INTO my_table (my_json) VALUES($1::JSON)")
.bind("$1", "{\"hello\": \"world\"}").execute();
Consume JSON as scalar type
connection.createStatement("SELECT my_json FROM my_table")
.execute()
.flatMap(it -> it.map((row, rowMetadata) -> row.get("my_json", String.class)));
The following types are supported for JSON exchange:
io.r2dbc.postgresql.codec.Json
ByteBuf
(must be released after usage to avoid memory leaks)ByteBuffer
byte[]
String
InputStream
(must be closed after usage to avoid memory leaks)CITEXT is a built-in extension to support case-insensitive text
columns. By default, the driver sends all string values as VARCHAR
that cannot be used directly with CITEXT
(without casting or converting values in your SQL).
If you cast input, then you can send parameters to the server without further customization of the driver:
CREATE TABLE test (ci CITEXT);
SELECT ci FROM test WHERE ci = $1::citext;
If you want to send individual String
-values in a CITEXT-compatible way, then use Parameters.in(…)
:
connection.createStatement("SELECT ci FROM test WHERE ci = $1")
.bind("$1", Parameters.in(PostgresqlObjectId.UNSPECIFIED, "Hello"))
.execute();
If you do not have control over the created SQL or you want to send all String
values in a CITEXT-compatible way, then you can customize the driver configuration by registering a StringCodec
to send String
values with the UNSPECIFIED
OID to let Postgres infer the value type from the provided values:
Builder builder = PostgresqlConnectionConfiguration.builder();
builder.codecRegistrar((connection, allocator, registry) -> {
registry.addFirst(new StringCodec(allocator, PostgresqlObjectId.UNSPECIFIED, PostgresqlObjectId.VARCHAR_ARRAY));
return Mono.empty();
});
You can register also the CodecRegistrar
as Extension
so that it gets auto-detected during ConnectionFactory
creation.
The driver can consume cursors that were created by PL/pgSQL as refcursor
.
Cursors are represented as RefCursor
objects. Cursors obtained from Result
can be used to fetch the cursor directly.
Since cursors are stateful, they must be closed once they are no longer in use.
connection.createStatement("SELECT show_cities_multiple()").execute()
.flatMap(result -> result.map((row, rowMetadata) -> row.get(0, RefCursor.class)))
.flatMap(cursor -> {
Mono<PostgresResult> data = cursor.fetch()
.flatMap(…)
.then(rc.close());
return data;
});
PostgreSQL allows replication streaming and decoding persistent changes to a database’s tables into useful chunks of data.
In PostgreSQL, logical decoding is implemented by decoding the contents of the write-ahead log, which describe changes on a storage level, into an application-specific form such as a stream of tuples or SQL statements.
Consuming the replication stream is a four-step process:
PostgresqlConnectionFactory.replication()
.ReplicationStream.map(…)
.On application shutdown, close()
the ReplicationStream
.
Note that a connection is busy once the replication is active and a connection can have at most one active replication stream.
Mono<PostgresqlReplicationConnection> replicationMono = connectionFactory.replication();
// later:
ReplicationSlotRequest request = ReplicationSlotRequest.logical()
.slotName("my_slot")
.outputPlugin("test_decoding")
.temporary()
.build();
Mono<ReplicationSlot> createSlot = replicationConnection.createSlot(request);
ReplicationRequest replicationRequest = ReplicationRequest.logical()
.slotName("my_slot")
.startPosition(LogSequenceNumber.valueOf(0))
.slotOption("skip-empty-xacts", true)
.slotOption("include-xids", false)
.build();
Flux<T> replicationStream = replicationConnection.startReplication(replicationRequest).flatMapMany(it -> {
return it.map(byteBuf -> {…})
.doOnError(t -> it.close().subscribe());
});
Applications may make use of Postgres enumerated types by using EnumCodec
to map custom types to Java enum
types.
EnumCodec
requires the Postgres OID and the Java to map enum values to the Postgres protocol and to materialize Enum instances from Postgres results.
You can configure a CodecRegistrar
through EnumCodec.builder()
for one or more enumeration type mappings. Make sure to use different Java enum types otherwise the driver is not able to distinguish between Postgres OIDs.
Example:
SQL:
CREATE TYPE my_enum AS ENUM ('FIRST', 'SECOND');
Java Model:
enum MyEnumType {
FIRST, SECOND;
}
Codec Registration:
PostgresqlConnectionConfiguration.builder()
.codecRegistrar(EnumCodec.builder().withEnum("my_enum",MyEnumType.class).build());
When available, the driver registers also an array variant of the codec.
This reference table shows the type mapping between PostgreSQL and Java data types:
Types in bold indicate the native (default) Java type.
Support for the following single-dimensional arrays (read and write):
This driver accepts the following extensions:
CodecRegistrar
to contribute Codec
s for PostgreSQL ObjectIDs.Extensions can be registered programmatically using PostgresConnectionConfiguration
or discovered using Java’s ServiceLoader
mechanism (from META-INF/services/io.r2dbc.postgresql.extension.Extension
).
The driver ships with built-in dynamic codecs (e.g. hstore
, PostGIS geometry
) that are registered during the connection handshake depending on their availability while connecting. Note that Postgres extensions registered after a connection was established require a reconnect to initialize the codec.
If SL4J is on the classpath, it will be used. Otherwise, there are two possible fallbacks: Console or java.util.logging.Logger
). By default, the Console fallback is used. To use the JDK loggers, set the reactor.logging.fallback
System property to JDK
.
Logging facilities:
io.r2dbc.postgresql
)io.r2dbc.postgresql.QUERY
on DEBUG
level)io.r2dbc.postgresql.PARAM
on DEBUG
level)io.r2dbc.postgresql.client
)
DEBUG
enables Message
exchange loggingTRACE
enables traffic loggingLogging that is associated with a connection reports the logical connection id (cid
) which is a driver-local connection counter and the Postgres Process Id (pid
) once the connection handshake finishes.
Having trouble with R2DBC? We’d love to help!
r2dbc
.R2DBC uses GitHub as issue tracking system to record bugs and feature requests.
If you want to raise an issue, please follow the recommendations below:
You don’t need to build from source to use R2DBC PostgreSQL (binaries in Maven Central), but if you want to try out the latest and greatest, R2DBC PostgreSQL can be easily built with the
maven wrapper. You also need JDK 1.8 and Docker to run integration tests.
$ ./mvnw clean install
If you want to build with the regular mvn
command, you will need Maven v3.5.0 or above.
Also see CONTRIBUTING.adoc if you wish to submit pull requests.
Running the JMH benchmarks builds and runs the benchmarks without running tests.
$ ./mvnw clean install -Pjmh
This project is released under version 2.0 of the Apache License.