Sunday, August 13, 2023

Micronaut Series: How to write and test HTTP REST Clients

Creating a Micronaut project

For the purposes of demonstrating the concepts we are going to create a Micronaut Java application that exposes a simple REST service.
To save some typing headfirst to the Microsoft launcher site, Microsoft Launch and choose 

Application Type: Micronaut Application
Micronaut Version: 4.0.3
Language: Java , Version: 17
Build Tool: Maven
Test Framework: JUnit
Included Features: http-client, reactor, awaitility, annotation-api, reactor-http-client, serialization-jackson
Generate project with Launch



REST Controller

Our PersonController controller will serve REST resources as follows:
URI    Verb    Purpose
"/person/{id}"    GET    getting one Person by ID
"person/all"        GET    getting all Person items
"/person"           POST  creates a new Person
Here's the class definition:
@Controller("/person")
public class PersonController {
@Inject
PersonService personService;

@Post
public Person add(@Body Person person) {
return personService.add(person);
}

@Get("/{id}")
public Optional<Person> findById(@PathVariable Integer id) {
return personService.getPersonList().stream()
.filter(person -> person.id().equals(id))
.findFirst();
}

@Get(value = "all", produces = MediaType.APPLICATION_JSON)
public List<Person> findAll() {
return personService.getPersonList();
}
}

Declarative clients

Declarative HTTP clients are introduced by the annotation @Client on either an interface or an abstract class. For the purposes of this demo I am going to use it with an interface PersonClient:
@Client("/person")
public interface PersonClient {
@Post
Person add(@Body Person person);

@Get("{id}")
Optional<Person> findById(@PathVariable Integer id);

@Get(value = "all", consumes = MediaType.APPLICATION_JSON)
List<Person> findAll();

}

Microsoft will create the client implementation for me using some fairy AOP magic dust. The JUnit Micronaut test is pretty straightforward:
@MicronautTest // (1)
public class PersonControllerTest {

@Inject
EmbeddedServer server; // (2)
@Inject
PersonClient client; // (3)

@Test
public void testAddDeclarative() {
final Person person = new Person(null, "First Last", 22);
Person s = client.add(person);
assertThat(s.id(), is(3)); // (4)
}

@Test
void testFindByIdDeclarative() {
Optional<Person> optionalPerson = client.findById(1);
assertThat(optionalPerson.isPresent(), is(true));
}

@Test
void testFindAllStreamDeclarative() {
List<Person> list = client.findAll();
assertThat(list, hasSize(2));
}
}

Explanations:
  1. The annotation @MicronautTest will add the necessary wiring so that our test knows about the Micronaut's application context and allows us to use injection.
  2. Injects an instance of our application as an embedded server.
  3. Injects the HTTP client created at compilation time using AOP.
  4. Asserts that the new person added has the ID=3 because there are already two people in the store.

Using Reactive processing

If we want to use reactive processing we need to make some adjustments. First our controller becomes:
@Controller("/personReactive")
public class PersonReactiveController {
private static final Logger LOGGER = LoggerFactory
.getLogger(PersonReactiveController.class);

@Inject
PersonService personService;

@Post
public Mono<Person> add(@Body Person person) {
Person newPerson = personService.add(person);
return Mono.just(newPerson); // (1)
}

@Get("/{id}")
public Publisher<Optional<Person>> findById(@PathVariable Integer id) {
return Publishers.just(personService.getPersonList()
.stream()
.filter(person -> person.id().equals(id))
.findAny()
); // (2)
}

@Get(value = "stream", produces = MediaType.APPLICATION_JSON_STREAM)
public Flux<Person> findAllStream() {
return Flux.fromIterable(personService.getPersonList())
.doOnNext(person -> LOGGER.info("Server: {}", person)); // (3)
}
}
Then our reactive declarative client:
@Client("/personReactive")
public interface PersonReactiveClient {
@Post
Mono<Person> add(@Body Person person);

@Get("{id}")
Publisher<Person> findById(@PathVariable Integer id);

@Get(value = "stream", consumes = MediaType.APPLICATION_JSON_STREAM)
Flux<Person> findAllStream();
}
Explanations:
For both server and client I am using Reactive Streams Specification and project-reactor.
  1.  Returning a Mono<Person> (import reactor.core.publisher.Mono) is the most appropriate thing to do here, since Mono is a Publisher that emits at most one value.
  2. Here the return value indicates that the publisher will emit an unbounded number of  Optional<Person> elements. I could have used instead Mono<Person> or Mono<Optional<Person>> since findById is supposed to return a single value (or nothing). (import org.reactivestreams.Publisher)
  3. Flux is a variant of Publisher that emits 0 to N elements and then it completes (successfully or not). (import reactor.core.publisher.Flux) 

Testing reactive client

Testing reactive becomes a tad more complex since we are now dealing with an asynchronous flow. Luckily Awaitility comes to the rescue:
@MicronautTest
public class PersonReactiveControllerTest {
private static final Logger LOGGER = LoggerFactory
        .getLogger(PersonReactiveControllerTest.class);

@Inject
EmbeddedServer server;
@Inject
PersonReactiveClient client;


@Test
public void testAddDeclarative() {
final AtomicBoolean flip = new AtomicBoolean(false);
final Person person = new Person(null, "First Last", 22);
Mono<Person> s = client.add(person);
s.subscribe(person1 -> {
LOGGER.info("Added: {}", person1);
assertThat(person1.id(), is(3));
flip.set(true);
});
await().atMost(200, TimeUnit.MILLISECONDS).until(flip::get); // (1)

}

@Test
void testFindByIdDeclarative() {
final AtomicBoolean flip = new AtomicBoolean(false);
Publisher<Person> publisher = client.findById(1);
publisher.subscribe(new Subscriber<Person>() {
@Override
public void onSubscribe(Subscription s) {
s.request(1); //(2)
}

@Override
public void onNext(Person person) {
LOGGER.info("Received a person: {}", person);
assertThat(person.id(), is(1)); // (3)
}

@Override
public void onError(Throwable t) {
LOGGER.error("Something went wrong", t);
}

@Override
public void onComplete() {
flip.set(true); // (4)
}
});
await().atMost(300, TimeUnit.MILLISECONDS).until(flip::get);
}
//other methods omitted for brevity
}
Explanations:
  1. Here I am using await() to wait until flip gets flipped to True (this occurs when the subscriber Lambda receives the emitted Person). The waiting time is bounded by the atMost call (200 millisecs).
  2. The anonymous subscriber requests a new item from the publisher upon receiving the onSubscribe event.
  3. The subscriber receives onNext event with the Person emitted by the publisher; I am asserting that the the Person's ID is in fact "1", the same value as requested by the call "findById(1)".
  4. Upon stream completion the subscriber gets the onComplete event; here I am setting the flip to True so await() can return and the test can finish.

Conclusion

This article hopefully demonstrated how to write and test declarative HTTP clients in a Micronaut application.
The project is available on GitHub

Friday, October 12, 2007

Broken locks

In one of my recent projects I came across this scenario:
On one *nix machine there are two processes: a non-Java
process and a Java application. The (external) process writes several text files to "well-known" locations (typically every file is put in its own directory). Also the file names are known in advance to a certain degree (there can be a name pattern specified instead of complete name).
The files are written on a regular basis (daily or hourly) but the exact timing is not known.
The Java application has to:
  • continually poll these directories and look for any new file(s) using the provided name or file pattern;
  • as soon as it detects a new file, reads and "process" it (the exact nature of process is unimportant);
  • after the file processing is done, move the file to another directory -- mainly for archival purposes.

A number of questions appeared:

  1. What exactly does it mean "new file [version]"?
  2. How do we know when the external process finished writing the input file? Suppose external process starts writing the file. Should the Java process wait for the writer process to finish before it starts reading? If yes, for how long? [perhaps one should ask: is required that reader process has to wait for writer to finish?]
  3. How do we prevent external process to start a new write of the input file while our Java process is in the middle of reading it?
  4. How do we make sure the Java process won't move the file (once it's done with it) while the writer process writes a new version of the file?

Question 1 is a matter of comparing file creation dates; it should be enough to compare the creation/modification date with a previous value. If the date is greater then we have a new file.

The question 2 was answered by comparing two successive takes of the file length; if they match then one can assume that writing ended. The time interval between those two measurements was established empirically by running the program on various input files. If there are still detected differences between the two successive length values the Java process (thread) sleeps and then tries again.

To answer questions 3 and 4 successfully some sort of access protocol must be established between the involved parties, because the file moving requirement makes the Java process a "writer" too. Therefore the processes must synchronize on the shared resource. This -I think- is a typical problem that IPC mechanisms ([FIFO]pipes, shared memory, sockets) solve.

Unfortunately none of the above IPC mechanisms was adopted (possibly because of lack of time or knowledge). Instead the team chose file locks as they would prevent -they thought- input file(s) being clobbered by multiple processes. Moreover, because nobody knew at that time what would be the writer process(es), only the Java program was considered. So, the Java program had to do the following:

  1. Tries to obtain exclusive access (exclusive file lock) to the input file;
  2. IF step 1 succeeds, reads the file and do something with the data (unimportant for this discussion) ELSE try later (sleeps a certain amount of time) ;
  3. AFTER step 2 completes, tries to move the input file to a back-up directory.
  4. Relinquish the exclusive lock on file, allowing other process(es) to write to it.
[IMPORTANT ASSUMPTION: no other process can write to the file while the Java process is holding an exclusive lock]

The above scheme was implemented and tested on a Windows machine. The tests were successful; everything seemed to work well. BUT as soon as the program was deployed on the *nix machine, weird things started to happen. The "writer" process happily wrote to the file while the "reader" read it - even though the file lock was held by the Java process!

Reality check: The assumption does not hold on *nix machines!

Read what FileLock javadocs has to say:
"This file-locking API is intended to map directly to the native locking facility of the underlying operating system. Thus the locks held on a file should be visible to all programs that have access to the file, regardless of the language in which those programs are written.

Whether or not a lock actually prevents another program from accessing the content of the locked region is system-dependent and therefore unspecified. The native file-locking facilities of some systems are merely advisory, meaning that programs must cooperatively observe a known locking protocol in order to guarantee data integrity [my emphasis - this is the case (of some?) of Unix platforms]. On other systems native file locks are mandatory, meaning that if one program locks a region of a file then other programs are actually prevented from accessing that region in a way that would violate the lock. On yet other systems, whether native file locks are advisory or mandatory is configurable on a per-file basis. To ensure consistent and correct behavior across platforms, it is strongly recommended that the locks provided by this API be used as if they were advisory locks.

On some systems, acquiring a mandatory lock on a region of a file prevents that region from being mapped into memory, and vice versa. Programs that combine locking and mapping should be prepared for this combination to fail.

On some systems, closing a channel releases all locks held by the Java virtual machine on the underlying file regardless of whether the locks were acquired via that channel or via another channel open on the same file. It is strongly recommended that, within a program, a unique channel be used to acquire all locks on any given file."

I wrote a small Java program that uses file locks and tested it on an AIX machine. The program reads a text file line by line and then echoes each line to the console. To make things more interesting (to be read: to allow the tester to alter the file using an editor), I added a delay of 1.5 seconds between every two reads. Here it is:



import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.RandomAccessFile;
import java.nio.channels.Channels;
import java.nio.channels.FileChannel;
import java.nio.channels.FileLock;
/**
*
* This class ``slow reads'' a file (whose path was given as argument) and echoes it to console.
* Demonstrates that on a UNIX system the exclusive file locking semantics cannot be preserved.
* Use it with a reasonably medium-size file (~8000 chars).
* Run it on a UNIX system with the command
java -cp . SlowReader testinput.txt

* While the program runs, open a navigator/editor and alter the file (towards the end).
* (You should be able to save the changes performed in the editor)
* You will see that the program's output changes accordingly.
* @author Val
*/
public class SlowReader {

private static final int TIME_TO_WAIT_BEFORE_WRITE = 1500;
private static final int BUFFER_SIZE = 1024;

/**
* @param args
* @throws IOException
*/
public static void main(String[] args) throws IOException {

if(args.length == 0 || args[0] == null) {
return;
}

File f = new File(args[0]);
if(f.exists() && f.canRead()) {
FileChannel fileChannel = null;
FileLock lock = null;
RandomAccessFile raf = new RandomAccessFile(f, "rw");
fileChannel = raf.getChannel();
lock = fileChannel.tryLock(0L, Long.MAX_VALUE, false);

if (lock == null) {
//couldn't lock the file :( Try later
System.err.println("Cannot obtain exclusive access to file");
return;
}
InputStream in = Channels.newInputStream(fileChannel);
OutputStream out = System.out;
try {
// Transfer bytes from in to out
byte[] buf = new byte[BUFFER_SIZE];
int len;
while ((len = in.read(buf)) > 0) {
Thread.sleep(TIME_TO_WAIT_BEFORE_WRITE);
out.write(buf, 0, len);
}

} catch (IOException e) {
System.err.println(e);
} catch (InterruptedException e) {
// ignored
} finally {
try {
if (in != null) {
in.close(); //this will close the channel too
}
} catch (IOException ioex) {
System.err.println(ioex);
}
}

}

}

}

On the AIX machine where I tested the above program the input file gets easily overwritten by another process. Needless to say, on Windows platforms the file cannot be written once the Java process has got the lock..

Conclusion

The FileLock is deceiving! The "write once, run everywhere" slogan has been proved once again to be just marketing bullshit.

P.S. I wonder what happens on Linux platforms. One of these days I will test it on my Ubuntu box.