-
Notifications
You must be signed in to change notification settings - Fork 8
Breaker
A Breaker
is short for Circuit Breaker. The idea behind the breaker is to wrap access to a service so that errors can be tracked and the circuit breaker can open if errors are exceeded. Like all things in Reakt there is an interface for Breaker
that defines a contract but other implementations can get creative on how they detect the Breaker
has been thrown.
/**
* Reference to the db session which get connected to async.
*/
private Breaker<Session> sessionBreaker = Breaker.opened();
//.. async connect to db and monitor connection health...
private synchronized Breaker<Session> sessionBreaker() {
return sessionBreaker;
}
...
sessionBreaker()
.ifOk(session -> {
final ResultSetFuture resultSetFuture = session.executeAsync(
QueryBuilder.insertInto(RESULTS_TABLE)
.value("sequence", sequence.incrementAndGet())
.value("rawData", snapshot.getSnapshot())
);
createPromiseFromResultSetFutureForStore(resultSetFuture, "Storing results snapshot");
})
.ifBroken(() -> callback.reject("Database connection is down, cannot store snapshot at the moment"));
Then you can add code to recover from this state using the reactor.
Here is a simple test to show how a broken breaker works.
AtomicBoolean okCalled = new AtomicBoolean();
AtomicBoolean brokenCalled = new AtomicBoolean();
final Breaker<Object> broken = Breaker.broken();
broken.ifOk(o -> okCalled.set(true));
broken.ifBroken(() -> brokenCalled.set(true));
assertFalse(okCalled.get());
assertTrue(broken.isBroken());
assertTrue(brokenCalled.get());
assertFalse(broken.isOk());
broken.cleanup(o -> {
});
Here is a simple test to show how an operational breaker works.
AtomicBoolean okCalled = new AtomicBoolean();
AtomicBoolean brokenCalled = new AtomicBoolean();
final Breaker<Object> ok = Breaker.operational(new Object());
ok.ifOk(o -> okCalled.set(true));
ok.ifBroken(() -> brokenCalled.set(true));
assertEquals(0, ok.errorCount());
assertTrue(okCalled.get());
assertFalse(ok.isBroken());
assertTrue(ok.isOk());
try {
ok.ifOk(o -> {
throw new IllegalStateException("ack");
});
fail();
} catch (Exception ex) {
}
assertEquals(1, ok.errorCount());
The idea is that there can be context aware service wrappers for certain types of breaker that could detect when/if the breaker is open.
You can also specify a limit of allowed errors as follows:
AtomicBoolean okCalled = new AtomicBoolean();
AtomicBoolean brokenCalled = new AtomicBoolean();
final Breaker<Object> ok = Breaker.operational(new Object(), 10);
ok.ifOk(o -> okCalled.set(true));
ok.ifBroken(() -> brokenCalled.set(true));
assertEquals(0, ok.errorCount());
assertTrue(okCalled.get());
assertFalse(ok.isBroken());
assertTrue(ok.isOk());
try {
ok.ifOk(o -> {
throw new IllegalStateException("ack");
});
fail();
} catch (Exception ex) {
}
assertEquals(1, ok.errorCount());
assertFalse(ok.isBroken());
for (int index = 0; index < 20; index++) {
try {
ok.ifOk(o -> {
throw new IllegalStateException("ack");
});
} catch (Exception ex) {
}
}
assertEquals(10, ok.errorCount());
assertTrue(ok.isBroken());
A fuller example is coming. We are still refining the Breaker interface but we found that we were using it a lot but using the Expected
interface (and before that the Optional
interface) to do what we do with the Breaker
.
Here is the full API for the Breaker interface.
package io.advantageous.reakt;
import io.advantageous.reakt.impl.BreakerImpl;
import java.util.function.Consumer;
/**
* Represents a Circuit Breaker.
* The contained service can be broken (open circuit) or operational (closed circuit).
* <p>
* This represents a service which may or may not be available.
* </p>
* We were using Expected a lot where we really wanted something like a Breaker.
* <p>
* This could be extended to blow the circuit with different conditions by providing
* your own Breaker.
* </p>
* <p>
* Also we want to use interfaces for all core concepts.
* </p>
* In addition we wanted callback for ifBroken and ifOperational.
* <p>
* If a service is active and healthy, {@code isOperational()} will return {@code true}.
* If a service is not healthy or not working, {code isBroken()} will return {@code true}.
* </p>
* This is heavily modeled after {@code Expected} optional.
*/
public interface Breaker<T> {
/**
* Common instance for {@code broken()}.
*/
Breaker OPENED = new BreakerImpl<>();
/**
* Returns an empty {@code Breaker} instance. No service is present for this
* value.
*
* @param <T> Type of the non-existent value
* @return an empty {@code ExpectedImpl}
*/
static <T> Breaker<T> broken() {
@SuppressWarnings("unchecked")
Breaker<T> t = OPENED;
return t;
}
/**
* Returns an {@code Breaker} using the specified present value, which must not be null.
*
* @param <T> the class of the value
* @param value the value to be present. Must be non-null
* @return an {@code ExpectedImpl} with the value present
* @throws NullPointerException if value is null
*/
static <T> Breaker<T> operational(T value) {
return new BreakerImpl<>(value);
}
/**
* Returns an {@code Breaker} using the specified present value, which must not be null.
*
* @param <T> the class of the value
* @param value the value to be present. Must be non-null
* @param maxErrorsCount max error count
* @return an {@code ExpectedImpl} with the value present
* @throws NullPointerException if value is null
*/
static <T> Breaker<T> operational(T value, final int maxErrorsCount) {
return new BreakerImpl<>(value, maxErrorsCount);
}
/**
* Return {@code true} if the service is broken, otherwise {@code false}.
*
* @return {@code true} if the service is broken, otherwise {@code false}
*/
boolean isBroken();
/**
* Return {@code true} if the service is working, otherwise {@code false}.
*
* @return {@code true} if the service is working, otherwise {@code false}.
*/
boolean isOperational();
/**
* Short version of isOperational.
*
* @return ok
*/
default boolean isOk() {
return isOperational();
}
/**
* If a service is beleived to be working, invoke the consumer with the value.
* <p>
* This tracks errors thrown by the consumer.
*
* @param consumer executed if a value is present
* @return this, fluent API
* @throws NullPointerException if value is present and {@code consumer} is
* null
*/
Breaker<T> ifOperational(Consumer<? super T> consumer);
/**
* Short version of ifOperational.
* If a service is beleived to be working, invoke the consumer with the value.
*
* @param consumer executed if a value is present
* @return this, fluent API
* @throws NullPointerException if value is present and {@code consumer} is
* null
*/
default Breaker<T> ifOk(Consumer<? super T> consumer) {
return ifOperational(consumer);
}
/**
* If a service is broken, invoke the runnable.
*
* @param runnable executed if a value is not present
* @return this, fluent API
*/
Breaker<T> ifBroken(Runnable runnable);
/**
* If a service is broken but present, invoke the consumer.
* This is used to do clean up, like closing a connection.
*
* @param consumer executed if a value is not present
* @return this, fluent API
*/
Breaker<T> cleanup(Consumer<? super T> consumer);
/**
* @return number of errors detected.
*/
long errorCount();
}
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Reactor, Stream, Results
- QBit Reactive Microservices
- Reakt Reactive Java
- Reakt Guava Bridge
- QBit Extensions
- Elekt Consul Leadership election
- Elekt Leadership election
- Reactive Microservices
What is Microservices Architecture?
QBit Java Micorservices lib tutorials
The Java microservice lib. QBit is a reactive programming lib for building microservices - JSON, HTTP, WebSocket, and REST. QBit uses reactive programming to build elastic REST, and WebSockets based cloud friendly, web services. SOA evolved for mobile and cloud. ServiceDiscovery, Health, reactive StatService, events, Java idiomatic reactive programming for Microservices.
Reactive Programming, Java Microservices, Rick Hightower
Java Microservices Architecture
[Microservice Service Discovery with Consul] (http://www.mammatustech.com/Microservice-Service-Discovery-with-Consul)
Microservices Service Discovery Tutorial with Consul
[Reactive Microservices] (http://www.mammatustech.com/reactive-microservices)
[High Speed Microservices] (http://www.mammatustech.com/high-speed-microservices)
Reactive Microservices Tutorial, using the Reactor
QBit is mentioned in the Restlet blog
All code is written using JetBrains Idea - the best IDE ever!
Kafka training, Kafka consulting, Cassandra training, Cassandra consulting, Spark training, Spark consulting
Java Promises
- Promise
- Promise then*() and catchError()
- Promise thenMap()
- Promise all()
- Promise any()
- Blocking Promise
- Invokable Promise
- Reactor Replay Promises
Callback, and async Results
Reactor, Stream and Stream Result
Expected & Circuit Breaker
Scala Akka and Reakt