# HiveMQ MQTT Client
[](https://maven-badges.herokuapp.com/maven-central/com.hivemq/hivemq-mqtt-client)
[](https://javadoc.io/doc/com.hivemq/hivemq-mqtt-client)
[](https://img.shields.io/github/actions/workflow/status/hivemq/hivemq-mqtt-client/.github/workflows/check.yml?branch=master)
MQTT 5.0 and 3.1.1 compatible and feature-rich high-performance Java client library with different API flavours and
backpressure support.
- Documentation: https://hivemq.github.io/hivemq-mqtt-client/
- Community forum: https://community.hivemq.com/
- HiveMQ website: https://www.hivemq.com/
- Contribution guidelines: [CONTRIBUTING.md](CONTRIBUTING.md)
- License: [LICENSE](LICENSE)
- MQTT resources:
- [MQTT Essentials](https://www.hivemq.com/mqtt-essentials/)
- [MQTT 5 Essentials](https://www.hivemq.com/mqtt-5/)
## Features
- **All [MQTT 3.1.1](http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/errata01/os/mqtt-v3.1.1-errata01-os-complete.html) and
[MQTT 5.0](https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html) features**
- API flavors:
- **Reactive**: [Reactive Streams](https://www.reactive-streams.org/) compatible,
[RxJava](https://github.com/ReactiveX/RxJava) and [Reactor](https://github.com/reactor/reactor-core) APIs available
- **Asynchronous API**: futures and callbacks
- **Blocking API**: for quick start and testing
- Switch flexibly between flavours and use them concurrently
- Flavours are clearly separated but have a consistent API style
- **Backpressure support**:
- QoS 1 and 2
- QoS 0 (dropping incoming messages, if necessary)
- Bringing MQTT flow control and reactive pull backpressure together
- Transports:
- TCP
- **SSL/TLS**
- All TLS versions up to TLS 1.3 are supported
- TLS mutual authentication
- TLS Server Name Indication (SNI)
- TLS Session Resumption
- Default and customizable hostname verification
- **WebSocket**, Secure WebSocket
- **Proxy**: SOCKS4, SOCKS5, HTTP CONNECT
- All possible combinations
- Automatic and configurable **thread management**
- Automatic and configurable **reconnect handling and message redelivery**
- Automatic and configurable **resubscribe if the session expired**
- **Manual message acknowledgment**
- Selectively enable manual acknowledgment for specific streams
- Acknowledge messages that are emitted to multiple streams independently per stream
(the client aggregates the acknowledgments before sending MQTT acknowledgments)
- Order of manual acknowledgment does not matter
(the client automatically ensures the order of MQTT acknowledgments for 100% compatibility with the MQTT specification)
- Lifecycle listeners
- When connected
- When disconnected or connection failed
- MQTT 5 specific:
- Pluggable Enhanced Authentication support (additional to MQTT specification: server-triggered re-authentication)
- Automatic Topic Alias mapping
- Interceptors for QoS flows
## Users
[

](https://github.com/bmwcarit) [

](https://github.com/bmwcarit/joynr) [

](https://www.openhab.org/) [

](https://github.com/eclipse/ditto) [

](https://github.com/OSGP/open-smart-grid-platform) [

](https://github.com/EXXETA/correomqtt) [

](https://github.com/SmartsquareGmbH/mqtt-starter) [

](https://www.openmuc.org/) [

](https://github.com/leftshiftone/gaia-sdk)
If you use the HiveMQ MQTT Client in a project that is not listed here, feel free to open an issue or pull request.
## How to use
Java 8 or higher is required.
### Dependency
#### Gradle
If you use Gradle, just include the following inside your `build.gradle(.kts)` file.
dependencies {
implementation("com.hivemq:hivemq-mqtt-client:1.3.14")
}
For optional features you can choose to include additional modules:
dependencies {
implementation(platform("com.hivemq:hivemq-mqtt-client-websocket:1.3.14"))
implementation(platform("com.hivemq:hivemq-mqtt-client-proxy:1.3.14"))
implementation(platform("com.hivemq:hivemq-mqtt-client-epoll:1.3.14"))
implementation("com.hivemq:hivemq-mqtt-client-reactor:1.3.14")
}
#### Maven
If you use Maven, just include the following inside your `pom.xml` file.
...
com.hivemq
hivemq-mqtt-client
1.3.14
...
NOTE: You have to set the compiler version to `1.8` or higher.
...
1.8
1.8
...
For optional features you can choose to include additional modules:
...
com.hivemq
hivemq-mqtt-client-websocket
1.3.14
pom
com.hivemq
hivemq-mqtt-client-proxy
1.3.14
pom
com.hivemq
hivemq-mqtt-client-epoll
1.3.14
pom
com.hivemq
hivemq-mqtt-client-reactor
1.3.14
...
### Shaded version
If you are experiencing problems with transitive dependencies, you can try the shaded version.
This version packs the transitive dependencies which are only used internal under a different package name.
The shaded version includes the websocket, proxy and epoll modules.
To use the shaded version just append `-shaded` to the artifact name.
#### Gradle
dependencies {
implementation("com.hivemq:hivemq-mqtt-client-shaded:1.3.14")
}
#### Maven
...
com.hivemq
hivemq-mqtt-client-shaded
1.3.14
...
### Snapshots
Snapshots can be obtained using [JitPack](https://jitpack.io/#hivemq/hivemq-mqtt-client).
#### Gradle
repositories {
...
maven { url 'https://jitpack.io' }
}
dependencies {
implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client:develop-SNAPSHOT")
// snapshots for optional modules
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-websocket:develop-SNAPSHOT"))
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-proxy:develop-SNAPSHOT"))
implementation(platform("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-epoll:develop-SNAPSHOT"))
implementation("com.github.hivemq.hivemq-mqtt-client:hivemq-mqtt-client-reactor:develop-SNAPSHOT")
}
#### Maven
...
jitpack.io
https://jitpack.io
com.github.hivemq.hivemq-mqtt-client
hivemq-mqtt-client
develop-SNAPSHOT
com.github.hivemq.hivemq-mqtt-client
hivemq-mqtt-client-websocket
develop-SNAPSHOT
pom
com.github.hivemq.hivemq-mqtt-client
hivemq-mqtt-client-proxy
develop-SNAPSHOT
pom
com.github.hivemq.hivemq-mqtt-client
hivemq-mqtt-client-epoll
develop-SNAPSHOT
pom
com.github.hivemq.hivemq-mqtt-client
hivemq-mqtt-client-reactor
develop-SNAPSHOT
...
Change the artifact name to `hivemq-mqtt-client-shaded` to get snapshots of the shaded version.
JitPack works for all branches and also specific commits.
Just specify `
-SNAPSHOT` or the first 10 digits of the commit id in the version.
## General principles
- API and implementation are clearly separated. All classes inside `internal` packages must not be used directly.
- The API is mostly fluent and uses fluent builders to create clients, configurations and messages.
- The API is designed to be consistent:
- The same principles are used throughout the library.
- The MQTT 3 and 5 interfaces are as consistent as possible with only version-specific differences.
## Creation of clients
Base classes: `Mqtt3Client`, `Mqtt5Client`
Mqtt5Client client = MqttClient.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.useMqttVersion5()
.build();
Mqtt3Client client = MqttClient.builder()...useMqttVersion3().build();
Or if the version is known upfront:
Mqtt5Client client = Mqtt5Client.builder()...build();
Mqtt3Client client = Mqtt3Client.builder()...build();
For each API style exists a specific `build...()` method.
## API flavours
Each API style has its own interface to separate them clearly.
At any time it is possible to switch the API style.
### Blocking API
- Builder method: `buildBlocking()`
- Switch method: `client.toBlocking()`
#### Examples
##### Publish example
Mqtt5BlockingClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildBlocking();
client.connect();
client.publishWith().topic("test/topic").qos(MqttQos.AT_LEAST_ONCE).payload("1".getBytes()).send();
client.disconnect();
#### Connect
client.connect();
Or with customized properties of the Connect message:
client.connectWith().keepAlive(10).send();
Or with pre-built Connect message:
Mqtt5Connect connectMessage = Mqtt5Connect.builder().keepAlive(10).build();
client.connect(connectMessage);
#### Publish
client.publishWith()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.send();
Or with pre-built Publish message:
Mqtt5Publish publishMessage = Mqtt5Publish.builder()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.build();
client.publish(publishMessage);
#### Consume messages
try (Mqtt5BlockingClient.Mqtt5Publishes publishes = client.publishes(MqttGlobalPublishFilter.ALL)) {
Mqtt5Publish publishMessage = publishes.receive();
// or with timeout
Optional publishMessage = publishes.receive(10, TimeUnit.SECONDS);
// or without blocking
Optional publishMessage = publishes.receiveNow();
}
#### Disconnect
client.disconnect();
Or with customized properties of the DISCONNECT message (only MQTT 5):
client.disconnectWith().reasonString("test").send();
Or with pre-built Disconnect message (only MQTT 5):
Mqtt5Disconnect disconnectMessage = Mqtt5Disconnect.builder().reasonString("test").build();
client.disconnect(disconnectMessage);
#### Reauth (only MQTT 5)
client.reauth();
### Async API
- Builder method: `buildAsync()`
- Switch method: `client.toAsync()`
#### Examples
##### Publish example
Mqtt5AsyncClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildAsync();
client.connect()
.thenCompose(connAck -> client.publishWith().topic("test/topic").payload("1".getBytes()).send())
.thenCompose(publishResult -> client.disconnect());
#### Connect
`connect()`, `connectWith()` and `connect(Mqtt3/5Connect)` method calls are analog to the Blocking API but return
`CompletableFuture`.
#### Publish
`publishWith()` and `publish(Mqtt3/5Publish)` method calls are analog to the Blocking API but return
`CompletableFuture`.
#### Consume messages
client.publishes(MqttGlobalPublishFilter.ALL, System.out::println);
Or with executing the callback on a specified executor:
client.publishes(MqttGlobalPublishFilter.ALL, System.out::println, executor);
#### Disconnect
`disconnect()`, `disconnectWith()` and `disconnect(Mqtt5Disconnect)` method calls are analog to the Blocking API but
return `CompletableFuture`.
#### Reauth (only MQTT 5)
`reauth()` method call is analog to the Blocking API but returns `CompletableFuture`.
### Reactive API
- Builder method: `buildRx()`
- Switch method: `client.toRx()`
#### Examples
##### Publish example
Mqtt5RxClient client = Mqtt5Client.builder()
.identifier(UUID.randomUUID().toString())
.serverHost("broker.hivemq.com")
.buildRx();
// As we use the reactive API, the following line does not connect yet, but returns a reactive type.
Completable connectScenario = client.connect()
.doOnSuccess(connAck -> System.out.println("Connected, " + connAck.getReasonCode()))
.doOnError(throwable -> System.out.println("Connection failed, " + throwable.getMessage()))
.ignoreElement();
// Fake a stream of Publish messages with an incrementing number in the payload
Flowable messagesToPublish = Flowable.range(0, 10_000)
.map(i -> Mqtt5Publish.builder()
.topic("a/b/c")
.qos(MqttQos.AT_LEAST_ONCE)
.payload(("test " + i).getBytes())
.build())
// Emit 1 message only every 100 milliseconds
.zipWith(Flowable.interval(100, TimeUnit.MILLISECONDS), (publish, i) -> publish);
// As we use the reactive API, the following line does not publish yet, but returns a reactive type.
Completable publishScenario = client.publish(messagesToPublish)
.doOnNext(publishResult -> System.out.println(
"Publish acknowledged: " + new String(publishResult.getPublish().getPayloadAsBytes())))
.ignoreElements();
// As we use the reactive API, the following line does not disconnect yet, but returns a reactive type.
Completable disconnectScenario = client.disconnect().doOnComplete(() -> System.out.println("Disconnected"));
// Reactive types can be easily and flexibly combined
connectScenario.andThen(publishScenario).andThen(disconnectScenario).blockingAwait();
#### Connect
`connect()`, `connectWith()` and `connect(Mqtt3/5Connect)` method calls are analog to the Async and Blocking API but
return `Single`.
#### Publish
`publish` takes a reactive stream of Publish messages (`Flowable`) and returns a reactive stream of Publish results
(`Flowable`).
The Reactive API is usually not used for publishing single messages.
Nevertheless it is possible with the following code.
Single result =
client.publish(Flowable.just(Mqtt5Publish.builder()
.topic("test/topic")
.qos(MqttQos.AT_LEAST_ONCE)
.payload("payload".getBytes())
.build())).singleOrError();
#### Consume messages
Flowable result =
client.publishes(MqttGlobalPublishFilter.ALL).doOnNext(System.out::println);
#### Disconnect
`disconnect()`, `disconnectWith()` and `disconnect(Mqtt5Disconnect)` method calls are analog to the Async and Blocking
API but return `Completable`.
#### Reauth (only MQTT 5)
`reauth()` method call is analog to the Async and Blocking API but returns `Completable`.
## Versioning
[Semantic Versioning](https://semver.org/) is used.
All code inside `com.hivemq.client.internal` packages must not be used directly. It can change at any time and is not
part of the public API.
Interfaces annotated with `DoNotImplement` must not be implemented. The implementation is provided by the library.
This allows the library to later add methods to the interface without breaking backwards compatibility with implementing
classes.
## License
HiveMQ MQTT Client is licensed under the `APACHE LICENSE, VERSION 2.0`. A copy of the license can be found [here](LICENSE).