02.함수형 데이터 처리
PART II. 함수형 데이터 처리
Stream
Stream : 데이터 처리 연산을 지원하도록 소스에서 추출된 연속된 요소
선언형으로 컬렉션 데이터를 처리
멀티스레드 코드를 구현하지 않아도 데이터를 투명하게 병렬로 처리
복잡한 루프, 조건문 등이 필요 없이 선언형 코드와 동작 파라미터화를 활용하면 변하는 요구사항에 쉽에 대응할 수 있다.
스트림 API 를 통해 데이터 처리를 병렬화로 진행하면서 스레드와 락을 걱정할 필요가 없다.
특징
선언형 : 간결성 + 가독성 향상
조립 : 유연성 향상
병렬화 : 성능 향상
상태 있는 연산과 상태 없는 연산
상태 있는 연산(stateful operation) : 값을 계산하는 데 필요한 상태를 저장하는 연산 (
reduce
,sorted
,distinct
)상태 없는 연산(stateless operation) : 상태를 저장하지 않는 연산 (
filter
,map
)
Java7
List<Dish> lowCaloricDishes = new ArrayList<>();
for (Dish d : dishes) {
if (d.getCalories() < 400) {
lowCaloricDishes.add(d);
}
}
Collections.sort(lowCaloricDishes, new Comparator<Dish>() {
@Override
public int compare(Dish d1, Dish d2) {
return Integer.compare(d1.getCalories(), d2.getCalories());
}
});
List<String> lowCaloricDishesName = new ArrayList<>();
for (Dish d : lowCaloricDishes) {
lowCaloricDishesName.add(d.getName());
}
Java8
//stream()
List<String> lowCaloricDishesName =
menu.stream()
.filter(d -> d.getCalories() < 400) //조건
.sorted(comparing(Dish::getCalories)) //정렬
.map(Dish::getName) //추출
.collect(toList()); //리스트화
//parallelStream()
List<String> lowCaloricDishesName =
menu.parallelStream()
.filter(d -> d.getCalories() < 400)
.sorted(comparing(Dish::getCalories))
.map(Dish::getName)
.collect(toList());
Example
//stream()
List<String> threeHighCaloricDishNames =
menu.stream()
.filter(dist -> dist.getCalories() < 300) //필터링
.map(Dish::getName) //추출
.limit(3) //축소
.collect(toList()); //리스트화
filter
: 스트림에서 특정 요소 제외map
: 한 요소를 다른 요소로 변환하거나 정보 추출limit
: 정해진 요소 개수 제한sorted
: 요소 정렬collect
: 스트림을 다른 형식으로 변환중간 연산 : filter, map, limit, sorted, distinct ...
중간 연산을 합친 다음 합쳐진 중간 연산을 최종 연산으로 한 번에 처리(LAZY)
최종 연산 : collect, forEach, count ..
스트림 파이프라인에서 결과를 도출
Stream VS Collection
데이터 계산 시점
Collection
: 현재 자료구조가 포함하는 모든 값을 메모리에 저장하고 모든 요소는 컬렉션에 추가 전에 요소는 미리 계산되어야 함모든 정보가 로딩될 때까지 기다려야만 한다. ex) DVD
Stream
: 요청할 때만 요소를 계산 (LAZY)로딩된 일부 데이터를 먼저 볼 수 있다. ex) 스트리밍
단 한 번만 소비할 수 있다. -> 다시 탐색 시 새로운 스트림 생성이 필요
데이터 반복 처리 방법
Collection
: 사용자가 직접 요소를 반복 (외부 반복)Stream
: 반복을 알아서 처리하고 결과 스트림값을 어딘가에 저장 (내부 반복)내부 반복의 경우 투명하게 병렬 처리, 더 최적화된 다양한 순서로 처리가 가능
Stream 활용
Filtering
filter, distinct
: Predicate 를 인수로 받아 일치하는 모든 요소를 포함하는 스트림 반환List<Dish> vegetarianMenu = menu.stream() .filter(Dish::isVegetarian) .distinct() //중복 필터링(hashCode, equals 로 결정) .collect(toList());
Slicing
takeWhile, dropWhile
: Predicate 를 이용한 슬라이싱 (필터에 걸리면 반복 중단)List<Dish> sliceMenu1 = specialMenu.stream() .takeWhile(dish -> dish.getCalories() < 320) .collect(toList());
슬라이싱 후 나머지 요소 선택
List<Dish> sliceMenu2 = specialMenu.stream() .dropWhile(dish -> dish.getCalories() < 320) .collect(toList());
limit
: 스트림 축소List<Dish> dishes = specialMenu.stream() .filter(dish -> dish.getCalories() > 300) .limit(3) .collect(toList());
skip
: 건너뛰기(처음 n개 요소 제외)List<Dish> dishes = menu.stream() .filter(dish -> dish.getCalories() > 300) .skip(2) .collect(toList());
Mapping
map
: 특정 함수 적용 결과 매핑List<String> dishNames = menu.stream() .map(Dish::getName) //요리명 추출 .map(String::length) //요리명 글자 길이 추출 .collect(toList());
flatMap
: 생성된 스트림을 하나의 스트림으로 평명화List<String> uniqueCharacters = words.stream() .map(word -> word.split("")) // 각 단어를 개별 문자를 포함하는 배열로 변환 .flatMap(Arrays:stram) .distinct() .forEach(toList());
Serching & Maching
allMatch
: 스트림에서 적어도 한 요소와 일치하는지 확인boolean isVegetarian = menu.stream() .anyMatch(Dish::isVegetarian)
anyMatch
: 스트림의 모든 요소가 일치하는지 검사boolean isHealthy = menu.stream() .allMatch(dish -> dish.getCalories() < 1000);
nonMatch
: 스트림의 요소 중 일치하는 요소가 없는지 확인boolean isHealthy = menu.stream() .noneMatch(dish -> dish.getCalories() >= 1000);
findFirst
: 스트림에서 첫 번째 요소 찾기Optional<Integer> firstSquaredDivisibleByThree = someNumbers.stream() .map(n -> n * n) .filter(n -> n % 3 == 0) .findFirst();
findAny
: 스트림에서 임의의 요소 반환Optional<Dish> dish = menu.stream() .filter(Dish:isVegetarian) .findAny();
Reducing
reduce
: 스트림의 모든 요소를 처리해서 값으로 도출 (초기값, BinaryOperator)누적자를 초깃값으로 설정한 다음 BinaryOperator 로 스트림의 각 요소를 반복적으로 누적자와 합쳐 스트림을 하나의 값으로 리듀싱
//덧셈 int sum = numvers.stream() .reduce(0, Integer::sum) //.reduce(0, (a, b) -> a + b); // 곱셈 int product = numvers.stream() .reduce(1, (a, b) -> a * b); // 최댓값 Optional<Integer> max = numbers.stream() .reduce(Integer::max);
숫자형 스트림
mapToInt
,mapToDouble
,mapToLong
메서드는 map 과 같은 기능을 수행하지만 Stream 대신 특화된 스트림을 반환int calories = menu.stream() //Stream<Dish> 반환 .mapToInt(Dish:getCalories) //IntStream 반환 .sum();
boxed
: 특화 스트림을 일반 스트림으로 변환하기IntStream intStream = menu.stream().mapToInt(Dish::getCalories); Stream<Integer> stream = intStream.boxed();
OptionalInt
,OptionalDouble
,OptionalLong
으로 이전 값의 존재 여부를 확인할 수 있다.OptaionInt maxCalories = menu.stream() .mapToInt(Dish:getCalories) .max(); int max = maxCalories.orElse(1);
range
(인수 미포함),rangeClosed
(인수 포함) 로 숫자를 생성할 수 있다.int count = IntStream.rangeClosed(1, 100) //1~100 범위 .filter(n -> n % 2 == 0) //짝수 스트림 .count(); //50
스트림 생성
값으로 스트림 생성
Stream<String> stream = Stream.of("Modern", "Java", "in", "Action");
stream.map(String::toUpperCase).forEach(System.out::println);
Stream<String> emptyStream = Stream.empty();
Null이 될 수 있는 객체로 스트림 생성
//null이 될 수 있는 객체를 포함하는 Stream
Stream<String> values = Stream.of("config", "home", "user")
.flatMap(key -> Stream.ofNullable(System.getProperty(key)));
배열로 스트림 생성
int[] numbers = {2, 3, 5, 7, 11, 13, 15, 17};
int sum = Arrays.stream(numbers).sum();
파일로 스트림 생성
Files.lines
: 주어진 파일의 행 스트림을 문자열로 반환
//고유 단어의 수 계산
long uniqueWords = 0;
try(Stream<String> lines = Files.lines(Paths.get("modernJavaInAction/data.txt"), Charset.defaultCharset())) { //AutoCloseable
uniqueWords = lines.flatMap(line -> Arrays.stream(line.split(" "))) //고유 단어 수
.distinct()
.count();
} catch(IOException e) {
}
무한 스트림 생성
Stream.iterate
: 생산된 각 값을 연속적으로 계산//(iterate) 짝수 스트림 생성 : 생산된 각 값을 연속적으로 계산 Stream.iterate(0, n -> n + 1) .limit(10) .forEach(System.out::println); //무한 스트림 생성을 중단하는 방법 IntStream.iterate(0, n -> n<100, n -> n+4) .forEach(System.out::println); IntStream.iterate(0, n -> n+4) .takeWhile(n -> n<100) .forEach(System.out::println);
Stream.generate
: 생산된 각 값을 연속적으로 계산하지 않음 (상태가 없는 메서드에 주로 사용)Stream.generate(Math::random) .limit(5) .forEach(System.out::pringln);
스트림으로 데이터 수집
Collectors Class 에서 제공하는 메서드
리듀싱과 요약
요소 그룹화
요소 분할
Reducing
// Collectors.counting
long howManyDishes1 = menu.stream().collect(Collectors.counting());
long howManyDishes2 = menu.stream().count();
// Collectors.maxBy (minBy)
//:주어진 비교자를 이용해서 스트림의 최솟값 요소를 Optional로 감싼 값을 반환 (요소가 없을 경우 return Optional.empty())
Comparator<Dish> dishCaloriesComparator = Comparator.comparingInt(Dish::getCalories);
Optional<Dish> mostCaloriesDish = menu.stream().collect(maxBy(dishCaloriesComparator));
// Collectors.summingInt (summingLong, summingDouble)
//:스트림 항목에서 정수 프로퍼티값의 합
int totalCalories = menu.stream().collect(summingInt(Dish::getCalories));
// Collectors.averagingInt (averagingLong, averagingDouble)
//:스트림 항목에서 정수 프로퍼티값의 평균값
double avgCalories = menu.stream().collect(averagingInt(Dish::getCalories));
// Collectors.summarizingInt (summarizingLong, summarizingDouble)
//:스트림 내 항목의 최대, 최소, 합계, 평균 등의 정수 정보 통계 수집
IntSummaryStatistics menuStatistics = menu.stream().collect(summarizingInt(Dish::getCalories));
IntSummaryStatistics{count=10, sum=4500, min=80, average=512.1221, max=780}
// Collectors.joining
//:내부적으로 StringBuilder를 이용하여 문자열 생성
String shortMenu = menu.strea().map(Dish::getName).collect(joining(", "));
범용 리듀싱 요약 연산
문제를 해결할 수 있는 다양한 해결 방법 중 문제에 특화된 해결책을 골라, 가독성과 성능을 모두 잡아보자.
// max (시작값이 없으므로 Optional 반환)
Optional<Dish> mostCalorieDish = menu.stream()
.collect(reducing((d1, d2) -> d1.getCalories() > d2.getCalories() ? d1 : d2));
// sum
int totalCalories = menu.stream().collect(reducing(0, Dish::getCalories, (i, j) -> i + j));
int totalCalories = menu.stream().collect(Dish::getCalories, Integer::sum);
int totalCalories = menu.stream().map(Dish::getCalories).reduce(Integer::sum).get
int totalCalories = menu.stream().mapToInt(Dish::getCalories).sum()
Grouping
분류 함수
Collectors.groupingBy(f, toList())
하나의 프로퍼티값(Key)을 기준으로 스트림의 항목을 그룹화
// Type Groupging
//{FISH=[salmon], OTHER=[fries, rice], MEAT=[pork, beef, chichen]}
Map<Dish.Type, List<Dish>> dishesByType = menu.stream().collect(groupingBy(Dish::getType));
// Calories Groupging
public enum CaloricLevel { DIET, NORMAL, FAT }
Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = menu.stream().collect(
groupingBy(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
}));
그룹화된 요소 조작
// Type & Calories Groupging (데이터가 없는 Type 도 포함)
//{FISH=[], OTHER=[fries, rice], MEAT=[pork, beef, chichen]}
Map<Dish.Type, List<Dish>> caloricDishedByType = menu.stream()
.collect(groupingBy(Dish::getType, filtering(dish -> dish.getCalories() > 500, toList())));
// 그룹의 각 요리를 관리 이름 목록으로 변환
Map<Dish.Type, List<String>> dishNamesByType = menu.stream()
.collect(groupingBy(Dish::getType, mapping(Dish.getName, toList())));
// 각 형식의 요리 태그 추출
//{FISH=[roasted, tasty], OTHER=[fried, fresh], MEAT=[salty, greasy]}
Map<Dish.Type, Set<String>> dishNamesByType = menu.stream()
.collect(groupingBy(Dish::getType, flatMapping(dish -> dishTags.get(dish.getName()).stream(), toSet())));
다수준 그룹화
public enum CaloricLevel { DIET, NORMAL, FAT }
Map<Dish.Type, Map<CaloricLevel, List<Dish>>> dishesByTypeCaloricLevel = menu.stream()
.collect(
groupingBy(Dish::getType, // 첫 번째 수준의 분류 함수
groupingBy(dish -> { // 두 번째 수준의 분류 함수
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
})
)
);
//{FISH={DIET=[prawns], NORMAL=[salmon]}, MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]}, ...}
서브그룹으로 데이터 수집
처음부터 값이 존재하지 않는 Key 는 Map 에 추가되지 않으므로 Optional wrapper 를 사용할 필요가 없음
groupingBy Collector 는 Stream 첫 번째 요소를 찾은 이후에 그룹화 맵에 새로운 키를 추가(lazy)
리듀싱 컬렉터는 절대 Optional.empty()를 반환하지 않음
collectingAndThen 는 적용할 컬렉터와 변환 함수를 인수로 받아 다른 컬렉터를 반환
//요리 수를 종류별로 연산
//{MEATH=3, FISH=5, OTHER=2}
Map<Dish.Type, Long> typesCount = menu.stream().collect(
groupingBy(Dish::getType, counting()));
//요리 종류 중 가장 높은 칼로리를 갖는 요리
//{MEATH=Optional[pork], FISH=Optional[salmon], OTHER=Optional[buger]}
Map<Dish.Type, Optional<Dish>> mostCaloricByType = menu.stream().collect(
groupingBy(Dish::getType, maxBy(comparingInt(Dish::getCalories))));
//{MEATH=pork, FISH=salmon, OTHER=buger}
Map<Dish.Type, Dish> mostCaloricByType = menu.stream().collect(
groupingBy(Dish::getType, //분류 함수
collectingAndThen(
maxBy(comparingInt(Dish::getCalories)), // 감싸인 컬렉터
Optional::get) // 변환 함수
)
);
//각 요리 형식에존재하는 모든 CaloricLevel 값
//{MEATH=[DIET, NORMAL], FISH=[NORMAL, FAT], OTHER=[DIET, NORMAL]}
Map<Dish.Type, Set<CaloricLevel>> caloricLevelsByType = menu.stream().collect(
groupingBy(Dish::getType, mapping(dish -> {
if (dish.getCalories() <= 400) return CaloricLevel.DIET;
else if (dish.getCalories() <= 700) return CaloricLevel.NORMAL;
else return CaloricLevel.FAT;
},
toCollection(HashSet::new)))
);
Partitioning
분할 함수
return Boolean (true or false)
분할 함수는 참, 거짓 두 가지 요소의 스트림 리스트를 모두 유지하는 장점이 있다.
프레디케이트를 스트림의 각 항목에 적용한 결과로 항목 분할
//{false=[pork, beef, salmon], true=[pizza, rice]}
Map<Boolean, List<Dish>> partitionedMenu = menu.stream().collect(
partitioningBy(Dish::isVegetarian));
//{false={MEATH=[DIET, NORMAL], FISH=[NORMAL, FAT]}, true={OTHER=[DIET, NORMAL]}}
Map<Boolean, Map<Dish.Type, List<Dish>>> vegetarianDishesByType = menu.stream().collect(
partitioningBy(Dish::isVegetarian, //- 분할 함수
groupingBy(Dish::getType))); //- 두 번째 컬렉터
//채식 요리와 채식이 아닌 요리 각 그룹에서 가장 칼로리가 높은 요리
//{false=pork, true=pizza}
Map<Boolean, Dish> mostCaloricPartitionedByVegetarian = menu.stream().collect(
partitioningBy(Dish::isVegetarian,
collectingAndThen(maxBy(comparingInt(Dish::getCalories)),
Optional::get)));
Example
숫자를 소수와 비소수로 분할하기
public boolean isPrime(int candidate) {
int candidateRoot = (int) Math.sqrt((double) candidate);
return IntStream.range(2, candidateRoot)
.noneMatch(i -> candidate % i == 0);
}
public Map<Boolean, List<Integer>> partitionPrimes(int n) {
return IntStream.rangeClosed(2, n).boxed()
.collect(partitioningBy(candidate -> isPrime(candidate)));
}
Collector
Collector Interface
/* T : 수집될 스트림 항목의 제네릭 형식
* A : 수집 과정에서 중간 결과를 누적하는 객체의 형식(누적자)
* R : 수집 연산 결과 객체의 형식
* supplier() -> accumulator() -> combiner() -> finisher()
*/
public interface Collector<T, A, R> {
//supplier() : 새로운 결과 컨테이너 만들기(수집 과정에서 빈 누적자 인스턴스를 만드는 파라미터가 없는 함수)
Supplier<A> supplier();
//accumulator() : 결과 컨테이너에 스트림 요소 추가하기(리듀싱 연산을 수행하는 함수 반환)
BiConsumer<A, T> accumulator();
//combiner() : 두 결과 컨테이너 병합(스트림의 서로 다른 서브파트를 병렬로 처리할 때 누적자가 결과를 어떻게 처리할지 정의)
BinaryOperator<A> combiner();
//finisher() : 최종 변환값을 결과 컨터네이너 적용하기
Function<A, R> finisher();
// characteristics() : 컬렉터의 연산을 정의하는 Characteristics 형식의 불변 집합 반환(어떤 최적화를 이용해 리듀싱 연산을 수행할 것인지 힌트 제공)
Set<Characteristics> characteristics();
}
Example toList()
public class ToListCollector<T> implements Collector<T, List<T>, List<T>> {
@Override
public Supplier<List<T>> supplier() { //<- 수집 연산의 시작
return () -> new ArrayList<T>(); //= return ArrayList::new; (생성자 참조)
}
@Override
public BiConsumer<List<T>, T> accumulator() { //<- 탐색한 항목을 누적하고 바로 누적자를 수정
return (list, item) -> list.add(item); //= return LisT::add;
}
@Override
public Function<List<T>, List<T>> finisher() {
return Fcuntion.identity(); //<- 항등 함수
}
@Override
public BinaryOperator<List<T>> combiner() {
return (list1, list2) -> { //<- 두 번째 콘텐츠와 합쳐서 첫 번째 누적자 수정
list1.addAll(list2); //<- 변경된 첫 번째 누적자 반환
return list1;
};
}
@Override
public Set<Characteristics> characteristics() {
/* UNORDERED: 리듀싱 결과는 스트림 요소의 방문 순서나 누적 순서에 영향을 받지 않음
* CONCURRENT: 다중 스레드에서 accumulator 함수를 동시에 호출할 수 있으며, 스트림의 병렬 리듀싱 수행 가능
* IDENTITY_FINISH: 리듀싱 과정의 최종 결과로 누적자 객체를 바로 사용
*/
return Collections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT));
}
}
사용하기
//Before
List<Dish> dishes = menu.stream().collect(toList());
//After
List<Dish> dishes = menu.stream().collect(new ToListCollector<Dish>());
병렬 데이터 처리와 성능
병렬 스트림
parallelStream()
: 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림내부적으로
ForkJoinPool
을 사용
//parallel() : 순차 스트림을 병렬 스트림으로
//sequential() : 병렬 스트림을 순차 스트림으로
public Long parallelSum(long n) {
return Stream.iterate(1L, i -> i + 1)
.limit(n)
.parallel()
.reduce(0L, Long::sum);
}
스트림 성능 측정
Java Microbenchmark Harness(JMH) 라이브러리를 통해 벤치마크 구현이 가능
// https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core (핵심 JMH 구현 포함) implementation group: 'org.openjdk.jmh', name: 'jmh-core', version: '1.34' // https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess (JAR 파일 생성에 도움을 주는 어노테이션 프로세서 포함) testImplementation group: 'org.openjdk.jmh', name: 'jmh-generator-annprocess', version: '1.34'
함수 성능 측정
올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다.
함수형 프로그래밍을 올바르게 사용하여 병렬 실행의 힘을 이용해보자.
@BenchmarkMode(Mode.AverageTime) //<- 벤치마크 대상 메서드 실행에 걸린 평균 시간 측정 @OutputTimeUnit(TimeUnit.MILLISECONDS) //<- 벤치마크 결과를 밀리초 단위 출력 @Fork(value = 2, jvmArgs = { "-Xms4G", "-Xmx4G" }) //<- 4GB 힙 공간을 제공한 환경에서 두 번의 수행을 통해 결과 신뢰성 확보 public class ParallelStreamBenchmark { private static final long N = 10_000_000L; @Benchmark //<- 벤치마크 대상 메서드 public long sequentialSum() { return Stream.iterate(1L, i -> i + 1).limit(N).reduce(0L, Long::sum); } @Benchmark public long parallelRangedSum() { /* iterate() 대신 LongStream.rangeClosed() * 기본형 long을 직접 사용하여 박싱, 언박싱 오버헤드 해결 * 청크로 분할할 수 있는 숫자 범위 생산 */ return LongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum); } @TearDown(Level.Invocation) //<- 매 벤치마크 실행 후 GC 동작 시도 public void tearDown() { System.gc(); } }
병렬 스트림 주의점
병렬화를 이용하려면,
스트림을 재귀적으로 분할
해야 하고,각 서브 스트림을
서로 다른 스레드의 리듀싱 연산으로 할당
해야 하고,이들
결과를 하나의 값으로 합쳐
야 한다.멀티 코어 간의 데이터 이동은 생각보다 비싸므로, 코어 간 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 처리하자.
또한, 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피하자.
상태 공유에 따른 부작용
public static long sideEffectParallelSum(long n) { Accumulator accumulator = new Accumulator(); LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add); return accumulator.total; } public static class Accumulator { private long total = 0; public void add(long value) { total += value; } } // 여러 스레드에서 동시에 누적자를 수정하면서 올바른 결과값이 나오지 않게 된다. System.out.println(ParallelStreams.sideEffectParallelSum(10_000_000L))
병렬 스트림 효과적으로 사용하기
확신이 서지 않으면 직접 측정하자.
순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 벤치마크로 성능을 측정해보자.
박싱을 주의하자.
자동 박식/언박싱은 성능을 크게 저하시킬 수 있는 요소다.
박싱 동작을 피하도록 되도록 기본형 특화 스트림을 사용해보자. (IntStream, LongStream, DoubleStream)
순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.
요소의 순서에 의존(limit, findFirst)하는 연산은 병렬 스트림에서 더 비싼 비용이 들어간다.
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하자.
N(처리해야 할 요소 수) * Q(하나의 요소 처리 비용)
Q가 높아지는 것은 병렬 스트림으로 성능 개선 가능성이 있음을 의미한다.
소량의 데이터에서는 병렬 스트림이 도움이 되지 않는다.
소량의 데이터는 병렬화 과정에서 생기는 부가 비용을 상쇄할 만큼의 이득을 얻지 못한다.
스트림을 구성하는 자료구조가 적절한지 확인하자.
ex. ArrayList를 LinkedList 보다 효율적으로 분할할 수 있다.
스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
Sized 스트림은 정확히 같은 크기의 두 스트림으로 분할 가능하여 효과적이지만, 필터 연산은 스트림 길이 예측이 불가하여 효과적인지 알 수 없다.
최종 연산의 병합 과정 비용을 살펴보자.
병합 과정 비용이 비싸다면 병렬 스트림으로 얻은 성능 이익이 상쇄
ArrayList
Excellence
LinkedList
Bad
IntStream.range
Excellence
Stream.iterate
Bad
HashSet
Good
TreeSet
Good
포크/조인 프레임워크
병렬 스트림을 제대로 사용하기 위해 병렬 스트림 내부 구조를 살펴보자.
병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 후, 서브태스크 각각의 결과를 합쳐 전체 결과를 만들도록 설계
서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService Intergace 구현
스레드 풀을 이용하려면, RecursiveTask의 서브클래스를 만들어야 한다.
포크/조인 프레임워크의 알고리즘은
분할/정복 알고리즘의 병렬화 버전이다.
포크/조인 프레임워크 제대로 사용하기
join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.
RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말자.
대신 compute나 fork 메서드 직접 호출하고, 순차 코드에서 병렬 계산을 시작 시에만 invoke 사용
서브태스크에서 fork 메서드를 호출해서 ForkJoinPool의 일정 조절
한 쪽 작업에는 fork, 다른 한 쪽 작업에는 compute 를 호출하자.
두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있다.
포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠른 것은 아니다.
포크/조인 프레임워크의 작업 훔치기
포크/조인 프레임워크의 병렬 처리 방법
풀에 있는 작업자 스레드의 태스크를 재분배하고 균형을 맞출 때
작업 훔치기 알고리즘
을 사용작업 훔치기 알고리즘
: 스레드는 자신에게 할당된 태스크를 포함하는이중 연결 리스트를 참조
하며 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와 작업을 처리 -> 할 일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 (모든 큐가 빌 때까지)다른 스레드 큐의 꼬리에서 작업을 훔쳐
온다.
따라서, 태스크 크기를 작게 나누어야 작업자 스레드 간의 작업 부화를 비슷한 수준으로 유지할 수 있다.
Last updated