List<String> dishNames =menu.stream().map(Dish::getName) //요리명 추출.map(String::length) //요리명 글자 길이 추출.collect(toList());
flatMap : 생성된 스트림을 하나의 스트림으로 평명화
List<String> uniqueCharacters =words.stream().map(word ->word.split("")) // 각 단어를 개별 문자를 포함하는 배열로 변환.flatMap(Arrays:stram).distinct().forEach(toList());
reduce : 스트림의 모든 요소를 처리해서 값으로 도출 (초기값, BinaryOperator)
누적자를 초깃값으로 설정한 다음 BinaryOperator 로 스트림의 각 요소를 반복적으로 누적자와 합쳐 스트림을 하나의 값으로 리듀싱
//덧셈int sum =numvers.stream().reduce(0, Integer::sum) //.reduce(0, (a, b) -> a + b);// 곱셈int product =numvers.stream().reduce(1, (a, b) -> a * b);// 최댓값Optional<Integer> max =numbers.stream().reduce(Integer::max);
숫자형 스트림
mapToInt, mapToDouble, mapToLong 메서드는 map 과 같은 기능을 수행하지만 Stream 대신 특화된 스트림을 반환
int calories =menu.stream() //Stream<Dish> 반환.mapToInt(Dish:getCalories) //IntStream 반환.sum();
//null이 될 수 있는 객체를 포함하는 StreamStream<String> values =Stream.of("config","home","user").flatMap(key ->Stream.ofNullable(System.getProperty(key)));
배열로 스트림 생성
int[] numbers = {2,3,5,7,11,13,15,17};int sum =Arrays.stream(numbers).sum();
파일로 스트림 생성
Files.lines : 주어진 파일의 행 스트림을 문자열로 반환
//고유 단어의 수 계산long uniqueWords =0;try(Stream<String> lines =Files.lines(Paths.get("modernJavaInAction/data.txt"),Charset.defaultCharset())) { //AutoCloseable uniqueWords =lines.flatMap(line ->Arrays.stream(line.split(" "))) //고유 단어 수.distinct().count();} catch(IOException e) {}
무한 스트림 생성
Stream.iterate : 생산된 각 값을 연속적으로 계산
//(iterate) 짝수 스트림 생성 : 생산된 각 값을 연속적으로 계산Stream.iterate(0, n -> n +1).limit(10).forEach(System.out::println);//무한 스트림 생성을 중단하는 방법IntStream.iterate(0, n -> n<100, n -> n+4) .forEach(System.out::println);IntStream.iterate(0, n -> n+4) .takeWhile(n -> n<100).forEach(System.out::println);
Stream.generate : 생산된 각 값을 연속적으로 계산하지 않음 (상태가 없는 메서드에 주로 사용)
// Type & Calories Groupging (데이터가 없는 Type 도 포함)//{FISH=[], OTHER=[fries, rice], MEAT=[pork, beef, chichen]}Map<Dish.Type,List<Dish>> caloricDishedByType =menu.stream().collect(groupingBy(Dish::getType, filtering(dish ->dish.getCalories() >500, toList())));// 그룹의 각 요리를 관리 이름 목록으로 변환Map<Dish.Type,List<String>> dishNamesByType =menu.stream().collect(groupingBy(Dish::getType, mapping(Dish.getName, toList())));// 각 형식의 요리 태그 추출//{FISH=[roasted, tasty], OTHER=[fried, fresh], MEAT=[salty, greasy]}Map<Dish.Type,Set<String>> dishNamesByType =menu.stream().collect(groupingBy(Dish::getType, flatMapping(dish ->dishTags.get(dish.getName()).stream(), toSet())));
다수준 그룹화
publicenumCaloricLevel { DIET, NORMAL, FAT }Map<Dish.Type,Map<CaloricLevel,List<Dish>>> dishesByTypeCaloricLevel =menu.stream().collect(groupingBy(Dish::getType,// 첫 번째 수준의 분류 함수 groupingBy(dish -> { // 두 번째 수준의 분류 함수if (dish.getCalories() <=400) returnCaloricLevel.DIET;elseif (dish.getCalories() <=700) returnCaloricLevel.NORMAL;elsereturnCaloricLevel.FAT; }) ));//{FISH={DIET=[prawns], NORMAL=[salmon]}, MEAT={DIET=[chicken], NORMAL=[beef], FAT=[pork]}, ...}
서브그룹으로 데이터 수집
처음부터 값이 존재하지 않는 Key 는 Map 에 추가되지 않으므로 Optional wrapper 를 사용할 필요가 없음
groupingBy Collector 는 Stream 첫 번째 요소를 찾은 이후에 그룹화 맵에 새로운 키를 추가(lazy)
리듀싱 컬렉터는 절대 Optional.empty()를 반환하지 않음
collectingAndThen 는 적용할 컬렉터와 변환 함수를 인수로 받아 다른 컬렉터를 반환
//요리 수를 종류별로 연산//{MEATH=3, FISH=5, OTHER=2}Map<Dish.Type,Long> typesCount =menu.stream().collect(groupingBy(Dish::getType, counting()));//요리 종류 중 가장 높은 칼로리를 갖는 요리//{MEATH=Optional[pork], FISH=Optional[salmon], OTHER=Optional[buger]}Map<Dish.Type,Optional<Dish>> mostCaloricByType =menu.stream().collect(groupingBy(Dish::getType, maxBy(comparingInt(Dish::getCalories))));//{MEATH=pork, FISH=salmon, OTHER=buger}Map<Dish.Type,Dish> mostCaloricByType =menu.stream().collect(groupingBy(Dish::getType,//분류 함수 collectingAndThen( maxBy(comparingInt(Dish::getCalories)),// 감싸인 컬렉터 Optional::get) // 변환 함수 )); //각 요리 형식에존재하는 모든 CaloricLevel 값//{MEATH=[DIET, NORMAL], FISH=[NORMAL, FAT], OTHER=[DIET, NORMAL]}Map<Dish.Type,Set<CaloricLevel>> caloricLevelsByType =menu.stream().collect(groupingBy(Dish::getType, mapping(dish -> {if (dish.getCalories() <=400) returnCaloricLevel.DIET;elseif (dish.getCalories() <=700) returnCaloricLevel.NORMAL;elsereturnCaloricLevel.FAT; }, toCollection(HashSet::new))));
Partitioning
분할 함수
return Boolean (true or false)
분할 함수는 참, 거짓 두 가지 요소의 스트림 리스트를 모두 유지하는 장점이 있다.
프레디케이트를 스트림의 각 항목에 적용한 결과로 항목 분할
//{false=[pork, beef, salmon], true=[pizza, rice]}Map<Boolean,List<Dish>> partitionedMenu =menu.stream().collect(partitioningBy(Dish::isVegetarian));//{false={MEATH=[DIET, NORMAL], FISH=[NORMAL, FAT]}, true={OTHER=[DIET, NORMAL]}}Map<Boolean,Map<Dish.Type,List<Dish>>> vegetarianDishesByType =menu.stream().collect(partitioningBy(Dish::isVegetarian,//- 분할 함수 groupingBy(Dish::getType))); //- 두 번째 컬렉터//채식 요리와 채식이 아닌 요리 각 그룹에서 가장 칼로리가 높은 요리//{false=pork, true=pizza}Map<Boolean,Dish> mostCaloricPartitionedByVegetarian =menu.stream().collect(partitioningBy(Dish::isVegetarian, collectingAndThen(maxBy(comparingInt(Dish::getCalories)), Optional::get)));
/* T : 수집될 스트림 항목의 제네릭 형식 * A : 수집 과정에서 중간 결과를 누적하는 객체의 형식(누적자) * R : 수집 연산 결과 객체의 형식 * supplier() -> accumulator() -> combiner() -> finisher() */publicinterfaceCollector<T,A,R> {//supplier() : 새로운 결과 컨테이너 만들기(수집 과정에서 빈 누적자 인스턴스를 만드는 파라미터가 없는 함수)Supplier<A> supplier();//accumulator() : 결과 컨테이너에 스트림 요소 추가하기(리듀싱 연산을 수행하는 함수 반환)BiConsumer<A,T> accumulator();//combiner() : 두 결과 컨테이너 병합(스트림의 서로 다른 서브파트를 병렬로 처리할 때 누적자가 결과를 어떻게 처리할지 정의)BinaryOperator<A> combiner();//finisher() : 최종 변환값을 결과 컨터네이너 적용하기Function<A,R> finisher();// characteristics() : 컬렉터의 연산을 정의하는 Characteristics 형식의 불변 집합 반환(어떤 최적화를 이용해 리듀싱 연산을 수행할 것인지 힌트 제공)Set<Characteristics> characteristics();}
Example toList()
publicclassToListCollector<T> implementsCollector<T,List<T>,List<T>> { @OverridepublicSupplier<List<T>> supplier() { //<- 수집 연산의 시작return () ->newArrayList<T>(); //= return ArrayList::new; (생성자 참조) } @OverridepublicBiConsumer<List<T>,T> accumulator() { //<- 탐색한 항목을 누적하고 바로 누적자를 수정return (list, item) ->list.add(item); //= return LisT::add; } @OverridepublicFunction<List<T>,List<T>> finisher() { returnFcuntion.identity(); //<- 항등 함수 } @OverridepublicBinaryOperator<List<T>> combiner() {return (list1, list2) -> { //<- 두 번째 콘텐츠와 합쳐서 첫 번째 누적자 수정list1.addAll(list2); //<- 변경된 첫 번째 누적자 반환return list1; }; } @OverridepublicSet<Characteristics> characteristics() {/* UNORDERED: 리듀싱 결과는 스트림 요소의 방문 순서나 누적 순서에 영향을 받지 않음 * CONCURRENT: 다중 스레드에서 accumulator 함수를 동시에 호출할 수 있으며, 스트림의 병렬 리듀싱 수행 가능 * IDENTITY_FINISH: 리듀싱 과정의 최종 결과로 누적자 객체를 바로 사용 */returnCollections.unmodifiableSet(EnumSet.of(IDENTITY_FINISH, CONCURRENT)); }}
parallelStream() : 각각의 스레드에서 처리할 수 있도록 스트림 요소를 여러 청크로 분할한 스트림
내부적으로 ForkJoinPool 을 사용
//parallel() : 순차 스트림을 병렬 스트림으로//sequential() : 병렬 스트림을 순차 스트림으로publicLongparallelSum(long n) {returnStream.iterate(1L, i -> i +1).limit(n).parallel().reduce(0L, Long::sum);}
스트림 성능 측정
Java Microbenchmark Harness(JMH) 라이브러리를 통해 벤치마크 구현이 가능
// https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-core (핵심 JMH 구현 포함)implementation group:'org.openjdk.jmh', name:'jmh-core', version:'1.34'// https://mvnrepository.com/artifact/org.openjdk.jmh/jmh-generator-annprocess (JAR 파일 생성에 도움을 주는 어노테이션 프로세서 포함)testImplementation group:'org.openjdk.jmh', name:'jmh-generator-annprocess', version:'1.34'
함수 성능 측정
올바른 자료구조를 선택해야 병렬 실행도 최적의 성능을 발휘할 수 있다.
함수형 프로그래밍을 올바르게 사용하여 병렬 실행의 힘을 이용해보자.
@BenchmarkMode(Mode.AverageTime) //<- 벤치마크 대상 메서드 실행에 걸린 평균 시간 측정@OutputTimeUnit(TimeUnit.MILLISECONDS) //<- 벤치마크 결과를 밀리초 단위 출력@Fork(value =2, jvmArgs = { "-Xms4G","-Xmx4G" }) //<- 4GB 힙 공간을 제공한 환경에서 두 번의 수행을 통해 결과 신뢰성 확보publicclassParallelStreamBenchmark {privatestaticfinallong N =10_000_000L; @Benchmark//<- 벤치마크 대상 메서드publiclongsequentialSum() {returnStream.iterate(1L, i -> i +1).limit(N).reduce(0L, Long::sum); } @BenchmarkpubliclongparallelRangedSum() {/* iterate() 대신 LongStream.rangeClosed() * 기본형 long을 직접 사용하여 박싱, 언박싱 오버헤드 해결 * 청크로 분할할 수 있는 숫자 범위 생산 */returnLongStream.rangeClosed(1, N).parallel().reduce(0L, Long::sum); } @TearDown(Level.Invocation) //<- 매 벤치마크 실행 후 GC 동작 시도publicvoidtearDown() {System.gc(); }}
병렬 스트림 주의점
병렬화를 이용하려면, 스트림을 재귀적으로 분할해야 하고,
각 서브 스트림을 서로 다른 스레드의 리듀싱 연산으로 할당해야 하고,
이들 결과를 하나의 값으로 합쳐야 한다.
멀티 코어 간의 데이터 이동은 생각보다 비싸므로, 코어 간 데이터 전송 시간보다 훨씬 오래 걸리는 작업만 병렬로 처리하자.
또한, 병렬 스트림과 병렬 계산에서는 공유된 가변 상태를 피하자. 상태 공유에 따른 부작용
publicstaticlongsideEffectParallelSum(long n) {Accumulator accumulator =newAccumulator();LongStream.rangeClosed(1, n).parallel().forEach(accumulator::add);returnaccumulator.total;}publicstaticclassAccumulator {privatelong total =0;publicvoidadd(long value) { total += value; }}// 여러 스레드에서 동시에 누적자를 수정하면서 올바른 결과값이 나오지 않게 된다.System.out.println(ParallelStreams.sideEffectParallelSum(10_000_000L))
병렬 스트림 효과적으로 사용하기
확신이 서지 않으면 직접 측정하자.
순차 스트림과 병렬 스트림 중 어떤 것이 좋을지 모르겠다면 벤치마크로 성능을 측정해보자.
박싱을 주의하자.
자동 박식/언박싱은 성능을 크게 저하시킬 수 있는 요소다.
박싱 동작을 피하도록 되도록 기본형 특화 스트림을 사용해보자. (IntStream, LongStream, DoubleStream)
순차 스트림보다 병렬 스트림에서 성능이 떨어지는 연산이 있다.
요소의 순서에 의존(limit, findFirst)하는 연산은 병렬 스트림에서 더 비싼 비용이 들어간다.
스트림에서 수행하는 전체 파이프라인 연산 비용을 고려하자.
N(처리해야 할 요소 수) * Q(하나의 요소 처리 비용)
Q가 높아지는 것은 병렬 스트림으로 성능 개선 가능성이 있음을 의미한다.
소량의 데이터에서는 병렬 스트림이 도움이 되지 않는다.
소량의 데이터는 병렬화 과정에서 생기는 부가 비용을 상쇄할 만큼의 이득을 얻지 못한다.
스트림을 구성하는 자료구조가 적절한지 확인하자.
ex. ArrayList를 LinkedList 보다 효율적으로 분할할 수 있다.
스트림의 특성과 파이프라인의 중간 연산이 스트림의 특성을 어떻게 바꾸는지에 따라 분해 과정의 성능이 달라질 수 있다.
Sized 스트림은 정확히 같은 크기의 두 스트림으로 분할 가능하여 효과적이지만, 필터 연산은 스트림 길이 예측이 불가하여 효과적인지 알 수 없다.
최종 연산의 병합 과정 비용을 살펴보자.
병합 과정 비용이 비싸다면 병렬 스트림으로 얻은 성능 이익이 상쇄
소스
분해성
ArrayList
Excellence
LinkedList
Bad
IntStream.range
Excellence
Stream.iterate
Bad
HashSet
Good
TreeSet
Good
포크/조인 프레임워크
병렬 스트림을 제대로 사용하기 위해 병렬 스트림 내부 구조를 살펴보자.
병렬화할 수 있는 작업을 재귀적으로 작은 작업으로 분할한 후, 서브태스크 각각의 결과를 합쳐 전체 결과를 만들도록 설계
서브태스크를 스레드 풀(ForkJoinPool)의 작업자 스레드에 분산 할당하는 ExecutorService Intergace 구현
스레드 풀을 이용하려면, RecursiveTask의 서브클래스를 만들어야 한다.
포크/조인 프레임워크의 알고리즘은 분할/정복 알고리즘의 병렬화 버전이다.
포크/조인 프레임워크 제대로 사용하기
join 메서드를 태스크에 호출하면 태스크가 생산하는 결과가 준비될 때까지 호출자를 블록시킨다.
RecursiveTask 내에서는 ForkJoinPool의 invoke 메서드를 사용하지 말자.
대신 compute나 fork 메서드 직접 호출하고, 순차 코드에서 병렬 계산을 시작 시에만 invoke 사용
서브태스크에서 fork 메서드를 호출해서 ForkJoinPool의 일정 조절
한 쪽 작업에는 fork, 다른 한 쪽 작업에는 compute 를 호출하자.
두 서브 태스크의 한 태스크에는 같은 스레드를 재사용할 수 있다.
포크/조인 프레임워크를 이용하는 병렬 계산은 디버깅이 어렵다.
멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠른 것은 아니다.
포크/조인 프레임워크의 작업 훔치기
포크/조인 프레임워크의 병렬 처리 방법
풀에 있는 작업자 스레드의 태스크를 재분배하고 균형을 맞출 때 작업 훔치기 알고리즘을 사용
작업 훔치기 알고리즘 : 스레드는 자신에게 할당된 태스크를 포함하는 이중 연결 리스트를 참조하며 작업이 끝날 때마다 큐의 헤드에서 다른 태스크를 가져와 작업을 처리 -> 할 일이 없어진 스레드는 유휴 상태로 바뀌는 것이 아니라 (모든 큐가 빌 때까지) 다른 스레드 큐의 꼬리에서 작업을 훔쳐온다.
따라서, 태스크 크기를 작게 나누어야 작업자 스레드 간의 작업 부화를 비슷한 수준으로 유지할 수 있다.