02.함수형 데이터 처리

PART II. 함수형 데이터 처리

Stream

Stream : 데이터 처리 연산을 지원하도록 소스에서 추출된 연속된 요소

  • 선언형으로 컬렉션 데이터를 처리

  • 멀티스레드 코드를 구현하지 않아도 데이터를 투명하게 병렬로 처리

  • 복잡한 루프, 조건문 등이 필요 없이 선언형 코드와 동작 파라미터화를 활용하면 변하는 요구사항에 쉽에 대응할 수 있다.

  • 스트림 API 를 통해 데이터 처리를 병렬화로 진행하면서 스레드와 락을 걱정할 필요가 없다.

특징

  • 선언형 : 간결성 + 가독성 향상

  • 조립 : 유연성 향상

  • 병렬화 : 성능 향상

상태 있는 연산과 상태 없는 연산

  • 상태 있는 연산(stateful operation) : 값을 계산하는 데 필요한 상태를 저장하는 연산 (reduce, sorted, distinct)

  • 상태 없는 연산(stateless operation) : 상태를 저장하지 않는 연산 (filter, map)

Java7

List<Dish> lowCaloricDishes = new ArrayList<>();
for (Dish d : dishes) {
    if (d.getCalories() < 400) {
        lowCaloricDishes.add(d);
    }
}

Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
    @Override
    public int compare(Dish d1, Dish d2) {
        return Integer.compare(d1.getCalories(), d2.getCalories());
    }
});
List<String> lowCaloricDishesName = new ArrayList<>();
for (Dish d : lowCaloricDishes) {
    lowCaloricDishesName.add(d.getName());
}

Java8

Example

  • filter : 스트림에서 특정 요소 제외

  • map : 한 요소를 다른 요소로 변환하거나 정보 추출

  • limit : 정해진 요소 개수 제한

  • sorted : 요소 정렬

  • collect : 스트림을 다른 형식으로 변환

  • 중간 연산 : filter, map, limit, sorted, distinct ...

    • 중간 연산을 합친 다음 합쳐진 중간 연산을 최종 연산으로 한 번에 처리(LAZY)

  • 최종 연산 : collect, forEach, count ..

    • 스트림 파이프라인에서 결과를 도출

Stream VS Collection

데이터 계산 시점

  • Collection : 현재 자료구조가 포함하는 모든 값을 메모리에 저장하고 모든 요소는 컬렉션에 추가 전에 요소는 미리 계산되어야 함

    • 모든 정보가 로딩될 때까지 기다려야만 한다. ex) DVD

  • Stream : 요청할 때만 요소를 계산 (LAZY)

    • 로딩된 일부 데이터를 먼저 볼 수 있다. ex) 스트리밍

    • 단 한 번만 소비할 수 있다. -> 다시 탐색 시 새로운 스트림 생성이 필요

데이터 반복 처리 방법

  • Collection : 사용자가 직접 요소를 반복 (외부 반복)

  • Stream : 반복을 알아서 처리하고 결과 스트림값을 어딘가에 저장 (내부 반복)

    • 내부 반복의 경우 투명하게 병렬 처리, 더 최적화된 다양한 순서로 처리가 가능

Stream 활용

Filtering

  • filter, distinct : Predicate 를 인수로 받아 일치하는 모든 요소를 포함하는 스트림 반환

Slicing

  • takeWhile, dropWhile : Predicate 를 이용한 슬라이싱 (필터에 걸리면 반복 중단)

  • 슬라이싱 후 나머지 요소 선택

  • limit : 스트림 축소

  • skip : 건너뛰기(처음 n개 요소 제외)

Mapping

  • map : 특정 함수 적용 결과 매핑

  • flatMap : 생성된 스트림을 하나의 스트림으로 평명화

Serching & Maching

  • allMatch : 스트림에서 적어도 한 요소와 일치하는지 확인

  • anyMatch : 스트림의 모든 요소가 일치하는지 검사

  • nonMatch : 스트림의 요소 중 일치하는 요소가 없는지 확인

  • findFirst : 스트림에서 첫 번째 요소 찾기

  • findAny : 스트림에서 임의의 요소 반환

Reducing

  • reduce : 스트림의 모든 요소를 처리해서 값으로 도출 (초기값, BinaryOperator)

  • 누적자를 초깃값으로 설정한 다음 BinaryOperator 로 스트림의 각 요소를 반복적으로 누적자와 합쳐 스트림을 하나의 값으로 리듀싱

숫자형 스트림

  • mapToInt, mapToDouble, mapToLong 메서드는 map 과 같은 기능을 수행하지만 Stream 대신 특화된 스트림을 반환

  • boxed : 특화 스트림을 일반 스트림으로 변환하기

  • OptionalInt, OptionalDouble, OptionalLong 으로 이전 값의 존재 여부를 확인할 수 있다.

  • range(인수 미포함), rangeClosed(인수 포함) 로 숫자를 생성할 수 있다.

스트림 생성

값으로 스트림 생성

Null이 될 수 있는 객체로 스트림 생성

배열로 스트림 생성

파일로 스트림 생성

  • Files.lines : 주어진 파일의 행 스트림을 문자열로 반환

무한 스트림 생성

  • Stream.iterate : 생산된 각 값을 연속적으로 계산

  • Stream.generate : 생산된 각 값을 연속적으로 계산하지 않음 (상태가 없는 메서드에 주로 사용)

스트림으로 데이터 수집

  • Collectors Class 에서 제공하는 메서드

    • 리듀싱과 요약

    • 요소 그룹화

    • 요소 분할

Reducing

범용 리듀싱 요약 연산

  • 문제를 해결할 수 있는 다양한 해결 방법 중 문제에 특화된 해결책을 골라, 가독성과 성능을 모두 잡아보자.

Grouping

  • 분류 함수

    • Collectors.groupingBy(f, toList())

    • 하나의 프로퍼티값(Key)을 기준으로 스트림의 항목을 그룹화

그룹화된 요소 조작

다수준 그룹화

서브그룹으로 데이터 수집

  • 처음부터 값이 존재하지 않는 Key 는 Map 에 추가되지 않으므로 Optional wrapper 를 사용할 필요가 없음

    • groupingBy Collector 는 Stream 첫 번째 요소를 찾은 이후에 그룹화 맵에 새로운 키를 추가(lazy)

      • 리듀싱 컬렉터는 절대 Optional.empty()를 반환하지 않음

    • collectingAndThen 는 적용할 컬렉터와 변환 함수를 인수로 받아 다른 컬렉터를 반환

Partitioning

  • 분할 함수

    • return Boolean (true or false)

    • 분할 함수는 참, 거짓 두 가지 요소의 스트림 리스트를 모두 유지하는 장점이 있다.

    • 프레디케이트를 스트림의 각 항목에 적용한 결과로 항목 분할

Example

  • 숫자를 소수와 비소수로 분할하기

Collector

  • Collector Interface

  • Example toList()

  • 사용하기

병렬 데이터 처리와 성능

병렬 스트림

  • parallelStream() : 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림

  • 내부적으로 ForkJoinPool 을 사용

스트림 성능 측정

  • Java Microbenchmark Harness(JMH) 라이브러리를 통해 벤치마크 구현이 가능

  • 함수 성능 측정

    • 올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다.

    • 함수형 프로그래밍을 올바르게 사용하여 병렬 실행의 힘을 이용해보자.

병렬 스트림 주의점

  • 병렬화를 이용하려면, 스트림을 재귀적으로 분할해야 하고,

  • 각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당해야 하고,

  • 이들 결과를 하나의 값으로 합쳐야 한다.

  • 멀티 코어 간의 데이터 이동은 생각보다 비싸므로, 코어 간 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 처리하자.

  • 또한, 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피하자. 상태 공유에 따른 부작용

병렬 스트림 효과적으로 사용하기

  • 확신이 서지 않으면 직접 측정하자.

    • 순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 벤치마크로 성능을 측정해보자.

  • 박싱을 주의하자.

    • 자동 박식/언박싱은 성능을 크게 저하시킬 수 있는 요소다.

    • 박싱 동작을 피하도록 되도록 기본형 특화 스트림을 사용해보자. (IntStream, LongStream, DoubleStream)

  • 순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.

    • 요소의 순서에 의존(limit, findFirst)하는 연산은 병렬 스트림에서 더 비싼 비용이 들어간다.

  • 스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하자.

    • N(처리해야 할 요소 수) * Q(하나의 요소 처리 비용)

    • Q가 높아지는 것은 병렬 스트림으로 성능 개선 가능성이 있음을 의미한다.

  • 소량의 데이터에서는 병렬 스트림이 도움이 되지 않는다.

    • 소량의 데이터는 병렬화 과정에서 생기는 부가 비용을 상쇄할 만큼의 이득을 얻지 못한다.

  • 스트림을 구성하는 자료구조가 적절한지 확인하자.

    • ex. ArrayList를 LinkedList 보다 효율적으로 분할할 수 있다.

  • 스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.

    • Sized 스트림은 정확히 같은 크기의 두 스트림으로 분할 가능하여 효과적이지만, 필터 연산은 스트림 길이 예측이 불가하여 효과적인지 알 수 없다.

  • 최종 연산의 병합 과정 비용을 살펴보자.

    • 병합 과정 비용이 비싸다면 병렬 스트림으로 얻은 성능 이익이 상쇄

소스
분해성

ArrayList

Excellence

LinkedList

Bad

IntStream.range

Excellence

Stream.iterate

Bad

HashSet

Good

TreeSet

Good

포크/조인 프레임워크

병렬 스트림을 제대로 사용하기 위해 병렬 스트림 내부 구조를 살펴보자.

병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 후, 서브태스크 각각의 결과를 합쳐 전체 결과를 만들도록 설계

서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService Intergace 구현

  • 스레드 풀을 이용하려면, RecursiveTask의 서브클래스를 만들어야 한다.

포크/조인 프레임워크의 알고리즘은 분할/정복 알고리즘의 병렬화 버전이다.

포크/조인 프레임워크 제대로 사용하기

  • join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.

  • RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말자.

    • 대신 compute나 fork 메서드 직접 호출하고, 순차 코드에서 병렬 계산을 시작 시에만 invoke 사용

  • 서브태스크에서 fork 메서드를 호출해서 ForkJoinPool의 일정 조절

    • 한 쪽 작업에는 fork, 다른 한 쪽 작업에는 compute 를 호출하자.

    • 두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있다.

  • 포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.

  • 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠른 것은 아니다.

포크/조인 프레임워크의 작업 훔치기

포크/조인 프레임워크의 병렬 처리 방법

  • 풀에 있는 작업자 스레드의 태스크를 재분배하고 균형을 맞출 때 작업 훔치기 알고리즘을 사용

    • 작업 훔치기 알고리즘 : 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하며 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와 작업을 처리 -> 할 일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 (모든 큐가 빌 때까지) 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.

  • 따라서, 태스크 크기를 작게 나누어야 작업자 스레드 간의 작업 부화를 비슷한 수준으로 유지할 수 있다.

Last updated