Posts

A Guide to Java Stream API

Oct 17, 2024
Catherine Edelveis
25.9

Processing data sequences without numerous for-loops in Java is possible with Stream API that allows for handling data using functional-style operations. 

This tutorial will guide you through key concepts of Java Stream API, how to create streams and process data using various operations, and how to use Stream Gatherers, a powerful addition to Stream API in JDK 22 for creating custom operations. 

Introduction to Java streams

Stream API was introduced in Java 8 to equip developers with a way of processing collections of elements using the functional programming approach. If we wanted to define functional programming in broad strokes, we would say that functional programming, being the type of declarative programming, describes the desired results we want the program to achieve rather than a sequence of steps the program needs to go through to get these results. The latter is the domain of imperative programming.

So, in functional programming, we describe or use a ready function that we want to apply to an element. We can also give hints to the API how to control element selection from the data source, but that’s it: the method of looping over the elements, the arrangement of the source, and other complex tasks like parallelism are left under the hood.

To sum up, declarative programming tells the program what needs to be done, and imperative programming — how it needs to be done.

Therefore, Stream API enables the creation of streams that consist of elements of a provided collection and functional-style operations that need to be applied to each of these elements without affecting the data source. And lambdas and method references provide a convenient way of using functions with streams.

Using Stream API makes code more laconic and prevents modification of the data source.

Core concepts of Java streams

A stream is a sequence of elements that acts as a data wrapper for a source of elements. A stream allows us to perform multiple operations on the elements of a data source without modifying the original data source. These operations are combined into a stream pipeline.

A stream pipeline consists of a source, zero or more Intermediate operations, and a terminal operation:

  • A source can be an array, a collection, an I/O channel, a generator of an infinite sequence of elements, etc.
  • Intermediate operations transform a stream into another stream and support multiple ways of processing data: you can filter, transform the elements, order them, and so on.
  • Terminal operations yield a result or a side effect: for instance, you can count elements, collect them, find a particular element, or perform an action on each element with forEach.

A stream pipeline

There are two important characteristics of streams to be considered. Streams are lazy, meaning that intermediate operations won’t be executed until a terminal operation is invoked. Also, after executing the terminal operation, the stream closes automatically, and it won’t be possible to reuse it.

Creating streams

You can create a stream from multiple sources, including collections, arrays, files, functions, static factory methods, etc. In addition, streams can be finite with a predefined number of elements, infinite with a potentially unbound number of elements, and parallel enabling parallel execution on multiple cores. 

Let’s look at all of that in more detail below.

Finite streams

We can obtain a stream from a collection with a stream() method:

        List<String> countries = Arrays.asList("Germany", "France", "Italy");
        Stream<String> countriesStream = countries.stream();
        countriesStream.forEach(System.out::println);

We will discuss the forEach() method below, but for now, it is a terminal operation that enables us to iterate over elements and in this case, print them out.

To obtain a stream from an array, we use Arrays.stream(). If you want to get a stream of primitives such as int, long, or double, you can use the dedicated interfaces IntStream, LongStream, or DoubleStream:

        int[] numbers = {1, 2, 3, 4};
        IntStream numbersStream = Arrays.stream(numbers);
        numbersStream.forEach(System.out::println);

Obtaining a stream from a file is possible with Files.lines():

        try (Stream<String> fileLinesStream = Files.lines(Paths.get("path/to/file"))) {
            fileLinesStream.forEach(System.out::println);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

You can also obtain a stream with a static factory method:

DoubleStream doubleStream = DoubleStream.of(4.5, 6.7, 1.2);

doubleStream.forEach(System.out::println);

Infinite streams

Infinite streams can be created using a generate() or an iterate() method. In addition, you should set the condition to stop the processing of elements at some point, or else, the program will run indefinitely. It can be done with a limit() method, for instance.

With that in mind, let’s create an unbound sequence of random numbers and limit them to 10 elements:

        Random random = new Random();
        IntStream randomIntsStream = IntStream.generate(random::nextInt)
                .limit(10);
        randomIntsStream.forEach(System.out::println);

In the example above, we created a stream pipeline by unifying the generate() and limit() methods. All operations that you perform on a stream can be merged into a single pipeline, so we can shrink the snippet above:

        Random random = new Random();

        IntStream.generate(random::nextInt)
                .limit(10)
                .forEach(System.out::println);

Parallel streams

You can create parallel streams to divide a bulk of work between several threads. Under the hood,parallel streams use the fork-join framework that splits the task into several subtasks accomplished by worker threads. The results are then merged into a single result.

To create parallel streams, you can use a parallel() method on a sequential stream:

        List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5);
        int sum = nums.stream()
                .parallel()
                .map(i -> i + 1)
                .reduce(0, Integer::sum);

Or a parallelStream() method:

        List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5);
        int sum = nums.parallelStream()
                .map(i -> i + 1) //this method adds 1 to each element
                .reduce(0, Integer::sum); //this method sums all elements

Note that using parallel streams is not always beneficial for performance. You can find a good comparison of parallel vs sequential streams in this article. But as a rule, the more elements you have to process and the less computation applied to each element, the bigger the performance gains with parallel streams.

Intermediate operations with streams

You have already seen some intermediate operations in action in sections above, let’s look at them and some other common operations in more detail.

filter()

The filter() method helps us to filter the elements by a specific parameter represented by a boolean condition. Only the elements that match the requirement will be sent down the pipeline for further processing. For instance, let’s find and print out all even numbers:

       List<Integer> numbers = Arrays.asList(4, 7, 3, 8, 1, 2);
       numbers.stream()
               .filter(n -> n % 2 == 0)
               .forEach(System.out::println);

map()

The map() method accepts a stream of elements, performs some operation on each element, and returns a stream of modified elements. Schematically, it looks like that:

Where T is the type of incoming elements, and A is the type of outcoming elements.

 But this operation doesn’t change the elements of the source, guaranteeing the immutability of the original.

For instance, let’s convert all String elements to lowercase:

        List<String> words = Arrays.asList("Apple", "Banana", "Pear");
        words.stream()
                .map(String::toLowerCase)
                .forEach(System.out::println);

flatMap()

The flatMap() method enables us to obtain a stream for each element of the incoming stream, and then unify these streams into one.

For instance, we have two classes, Order and Product:

    static class Order  {
        List<Product> products;
        double totalPrice;

        public Order(List<Product> products) {
            this.products = products;
            calculatetotalPrice(products);
        }

        private void calculatetotalPrice(List<Product> products) {
            for(Product product : products) {
                totalPrice += product.price();
            }
        }
        public List<Product> getProducts() {
            return products;
        }
        public double getTotalPrice() {
            return totalPrice;
        }

    }

    record Product (String name, int price) {
    }

Note the calculatetotalPrice() method: I will show you how to use a stream instead of a for-loop later on.

Having a List of orders, we can obtain a stream of products for each order and then combine them into one stream of products and perform some action on each product (get a name, for instance):

        Product chair = new Product("Chair", 150);
        Product table = new Product("Table", 350);
        Product pencil = new Product("Pencil", 2);

        Order firstOrder = new Order(Arrays.asList(chair, table));
        Order secondOrder = new Order(Arrays.asList(pencil, chair));

        List<Order> orders = Arrays.asList(firstOrder, secondOrder);
        orders.stream()
                .flatMap(order -> order.getProducts()
                        .stream())
                .map(Product::name)
                .forEach(System.out::println);

sorted()

The sorted() method enables us to sort the elements of the stream. Let’s take a look at the example above: we have a stream of Strings, and we can simply add sorted() without parameters to sort them in natural (here, alphabetical) order, or we can use Comparator.reverseOrder() in this method to sort the elements in descending order:

        orders.stream()
                .flatMap(order -> order.getProducts()
                        .stream())
                .map(Product::name)
                .sorted(Comparator.reverseOrder())
                .forEach(System.out::println);

distinct()

Let’s continue with orders and products. Both orders include the same product Chair, so the output of the examples above will contain a repeating String. To get rid of duplicate elements, we can use the distinct() method:

        orders.stream()
                .flatMap(order -> order.getProducts()
                        .stream())
                .map(Product::name)
                .distinct()
                .sorted(Comparator.reverseOrder())
                .forEach(System.out::println);

skip()

The skip() method enables us to skip first N elements of the stream:

        List<Integer> numbers = Arrays.asList(4, 7, 3, 8, 1, 2);
        numbers.stream()
                .skip(2)
                .forEach(System.out::println);

limit()

We have already seen the limit() method in action above, when we created an infinite stream. But you can, of course, use it with finite streams as well to limit the number of elements:

        List<Integer> numbers = Arrays.asList(4, 7, 3, 8, 1, 2);
        numbers.stream()
                .limit(4)
                .forEach(System.out::println);

Custom intermediate operations with Stream Gatherers

Stream API provides a limited albeit rich set of intermediate operations. Sometimes, you need to apply custom logic not supported by Java streams. Luckily, Java 22 introduced a solution to that issue: Stream Gatherers.

A new intermediate operation Stream::gather(Gatherer) allows us to process elements of a stream by applying user-defined logic in a gatherer, which is an instance of a Gatherer interface. 

A gatherer is defined by four functions, some of which are optional:

  • Initializer (optional) provides an object with a private state, which is useful when a gatherer needs to compare new elements to the previous ones;
  • Integrator (obligatory) integrates a new element from an input stream. It can also inspect the private state object and emit elements to the output stream;
  • Combiner (optional) can evaluate elements in parallel if you use parallel streams;
  • Finisher (optional) is invoked when there are no more input elements to consume.

You can create your own gatherers by implementing the Gatherer interface or take advantage of built-in gatherers from the Gatherers class.

Let’s see how we can implement gatherers in practice. Note that Stream Gatherers are still in preview, so you need to enable preview language features with a command line --enable-preview flag or select Java 23 Preview features in IntelliJ IDEA via Project StructureProjectLanguage Level.

First, let’s implement a custom gatherer that enables us to collect unique users by country. Create a simple record User:

    record User (String username, String country) { }

Now, we need to create a DistinctByCountryGatherer class that implements the Gatherer interface:

public class DistinctByCountryGatherer<User, String> implements Gatherer<User, Set<String>, User> { }

The Gatherer<T,A,R> interface has three parameters, where: 

  • T is a type of an input element. In our case, it will be an object of class User.
  • A is a type of a gatherer’s private state object that can be used to track the previously seen elements. In our case, it’s a Set<String> representing a country.
  • R is a type of an output element. In our case, it is the same as the input parameter.

The DistinctByCountryGatherer has two type parameters, User and String.

Next, we need a function that will be applied to the elements of a stream:

    private final Function<User, String> selector;

    public DistinctByCountryGatherer(Function<User, String> selector) {
        this.selector = selector;
    }

We also need to override and update two methods, initializer() and integrator():

    @Override
    public Supplier<Set<String>> initializer() {
        return HashSet::new;
    }

    @Override
    public Integrator<Set<String>, User, User> integrator() {
        return Integrator.ofGreedy((set, user, downstream) -> {
            String extractedCountry = selector.apply(user);
            if(!set.contains(extractedCountry)) {
                set.add(extractedCountry);
                downstream.push(user);
            }

            return true;
        });
    }

As mentioned above, the initializer() function provides a private state object for the gatherer ro compare elements. We will store country names in a Set to identify unique ones.

The integrator() function is created by calling the Integrator.ofGreedy() method, which takes a lambda expression as an argument with three parameters: 

  • set is is our private state object,
  • user is the next input element,
  • downstream is an object of Gatherer.Downstream<T> that pushes its argument down the stream pipeline.

We extract the country name from the User object by applying the selector function to it. If our private state object HashSet doesn’t contain this country name, it is added to the stream, and the User object is pushed down the pipeline. Otherwise, the object is discarded.

Let’s use our custom Gatherer in a pipeline:

        List<User> users = Arrays.asList(
                new User("francesca", "Italy"),
                new User("mark", "Germany"),
                new User("paolo", "Italy"));
        

        users.stream()
                .gather(new DistinctByCountryGatherer<>(User::country))
                .forEach(System.out::println);

The output will be:

User[username=francesca, country=Italy]
User[username=mark, country=Germany]

You can combine multiple custom gatherers in one stream pipeline, creating tailor-made element processing.

Terminal operations with streams

We had to use some terminal operation above such as forEach() and reduce() simply because our intermediate wouldn’t execute without them. But it’s time to familiarize ourselves with more terminal operations!

collect()

The collect() method enables us to accumulate the elements of the stream into a collection or a mutable result container (for instance, you can concatenate all Strings into one String). Let’s take the example with orders and instead of printing out the results, collect them into a Set:

        Set<String> products = orders.stream()
                .flatMap(order -> order.getProducts()
                        .stream())
                .map(Product::name)
                .collect(Collectors.toSet());

In the case you want to gather the results into a List, you can simplify the code and use toList() instead of collect(Collectors.toList()):

        List<String> products = orders.stream()
                .flatMap(order -> order.getProducts()
                        .stream())
                .map(Product::name)
                .distinct()
                .sorted(Comparator.reverseOrder())
                .toList();

forEach()

The forEach() method iterates over the elements and performs a specified action on each. We used it to print out the elements of the stream, which is the most common operation with this method.

reduce()

The reduce() method is the reduction operation meaning that it produces a single result for a stream of elements. There are three concepts associated with reduce() we need to understand:

  • Identity is the initial value of the operation and the default value in case the stream is empty;
  • Accumulation function takes a partial result of the operation and the next element of a stream;
  • Combining function combines the partial result in case of a parallelized reduction operation. 

So, these are signatures for the reduce() method:

  • reduce(BinaryOperator<T> accumulator) accepts the accumulation function and returns an Optional<T>;
  • reduce(T identity, BinaryOperator<T> accumulator) accepts the provided identity and an accumulation function and return a single value;
  • reduce(U identity, BiFunction<U,? super T,U> accumulator, BinaryOperator<U> combiner) accepts the provided identity, an accumulation and combining functions and returns a <U> U.

Let’s look at the example. Previously, we used a for-loop to calculate a total price for an order.

Instead of iterating over each element manually, we can create a stream of Integers (product prices) and obtain the sum of all elements:

        private void calculatetotalPrice(List<Product> products) {
            totalPrice = products.stream()
                    .map(pr -> pr.price)
                    .reduce(0, (x, y) -> x + y);
        }

Here, value 0 is the identity that serves as the initial value of the operation, the lambda expression (x, y) -> x + y is the accumulation function, where x is the partial result and y is the next element of the stream.

You can simplify the code above by using the method reference:

        private void calculatetotalPrice(List<Product> products) {
            totalPrice = products.stream()
                    .map(pr -> pr.price)
                    .reduce(0, Integer::sum);
        }

count() / average() / min() / max()

You can count the number of elements in the stream with count():

List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5);
long numberOfElements = nums.stream().count();

Find a min or max value among elements using min() and max() that accept a Comparator. Both methods return an Optional because a result may not exist. As a result, you can receive an Optional or throw an exception: 

List<Integer> nums = Arrays.asList(1, 2, 3, 4, 5);
int minNumber = nums.stream()
                .min(Comparator.comparing(Integer::intValue))
                .orElseThrow(NoSuchElementException::new);

Optional<Integer> maxNumber = nums.stream()
                .max(Comparator.comparing(Integer::intValue));

You can also count the average of elements with the average() method:

        int[] numbers = {1, 2, 3, 4, 5, 6};
        OptionalDouble average  = Arrays.stream(numbers)
                .average();

findFirst() / findAny()

With the findFirst() method, you can get the first element of the stream. The method returns an Optional:

        String firstProduct = orders.stream()
                .flatMap(order -> order.getProducts()
                        .stream())
                .map(Product::name)
                .findFirst()
                .orElse(null);

The findAny() method returns any element of the stream. Be careful with this one: the method returns a random element regardless of its position in the stream, and there’s no guarantee that the same element will be chosen every time you invoke this method.

        String anyProduct = orders.stream()
                .flatMap(order -> order.getProducts()
                        .stream())
                .map(Product::name)
                .findAny()
                .orElse(null);

allMatch() / anyMatch() / noneMatch()

The allMatch(), anyMatch(), and noneMatch() methods help you determine whether the elements meet the provided condition and return a boolean. The allMatch() method determines whether all elements satisfy the condition, anyMatch() — whether any one element satisfies the condition, and noneMatch() — whether no elements satisfy the condition. In all cases, the element processing stops as soon as the answer is determined:

        boolean allOrdersMatch = orders.stream()
                .allMatch(order -> order.getProducts().size() > 1);

        boolean anyOrderMatches = orders.stream()
                .anyMatch(order -> order.gettotalPrice() > 300);

        boolean noOrderMatches = orders.stream()
                .noneMatch(order -> order.gettotalPrice() > 1000);

Dealing with null in streams

When working with streams, we can come across a NullPointerException if we try to perform actions on a Collection with a null value or on elements of a stream with a null value. Luckily, we can make our code NPE-proof in both situations.

To prevent processing a null stream, we can take advantage of the Optional.ofNullable() method, which creates an Optional of a provided collection. In case the collection is null, it creates an empty stream:

        List<String> nullList = null;
        Optional.ofNullable(nullList)
                .stream()
                .forEach(System.out::println);

What if some objects in a stream are null? In this case, we can simply filter them out with filter():

        List<Product> listWithNulls = Arrays.asList(pencil, null, table);
        listWithNulls.stream()
                .filter(Objects::nonNull)
                .map(Product::name)
                .forEach(System.out::println);

Conclusion

To summarize:

  • Stream API enables the developers to process a collection of elements with functional programming tools.
  • Streams describe operations that need to be performed on the elements of a data source without modifying the data source.
  • There are intermediate and terminal operations. Intermediate operations won’t be executed until a terminal operation is invoked.
  • There are multiple default intermediate and terminal operations, but you can also create a custom intermediate operation with Stream Gatherers. 

Want to know more about cool Java features? Subscribe to our newsletter and get a monthly digest on performance, security, and all things Java.

 

Subcribe to our newsletter

figure

Read the industry news, receive solutions to your problems, and find the ways to save money.

Further reading