반응형

1. 서론

이번 포스팅에서는 Chapter11의 null 대신 Optional 클래스에 대해 진행하도록 하겠습니다.

 

2. 값이 없는 상황을 어떻게 처리할까?

책에서는 null 관련하여 아래 예제를 말합니다.

 

public class Person {
    private Car car;

    public Car getCar() {
        return car;
    }
}
    
public class Car {
    private Insurance insurance;

    public Insurance getInsurance() {
        return insurance;
    }
}
    
public class Insurance {
    private String name;
    
    public String getName() {
        return name;
    }
}

public String getCarInsuranceName(Person person) {
    return person.getCar().getInsurance().getName();
}

 

위의 코드는 Person이 차를 가지고 있지 않는 경우에는 NullPointerException 이 발생하게 됩니다.

 

1) 보수적인 자세로 NullPointerException 줄이기

 

위 예제에서 null를 예방하기 위해 if-else 를 사용하게 된다면 아래와 같은 코드가 될 것입니다.

 

public String getCarInsuranceName(Person person) {
    if (person != null) {
        Car car = person.getCar();
        if (car != null) {
            Insurance insurance = car.getInsurance();
            if (insurance != null) {
                return insurance.getName();
            }
        }
    }
    return "Unknown";
}

 

한눈에 봐도 코드가 난잡하며, 객체의 연관도가 깊을수록 if의 깊이는 증가됩니다.

 

가끔 깊이가 너무 깊어져 이를 예방하기 위해 아래와 같은 코드가 나올수도 있습니다.

 

public String getCarInsuranceName(Person person) {
    if (person == null) {
        return "Unknown";
    }
    Car car = person.getCar();
    if (car == null) {
        return "Unknown";
    }
    Insurance insurance = car.getInsurance();
    if (insurance == null) {
        return "Unknown"; 
    }
    
    return insurance.getName();
}

 

 

이 코드의 경우에는 깊이는 깊어지지 않지만 너무 많은 return 출구가 있어 유지보수가 어렵다는 단점을 가지게 됩니다.

 

 

 

2) null 때문에 발생하는 문제

 

위와 같이 null로 인해서 발생하는 문제로는 아래와 같습니다.

 

  1. 에러의 근원 : NullPointerException 은 자바에서 가장 흔히 발생하는 에러입니다.
  2. 코드를 어지럽힘 : null 체크를 통해 코드가 난잡하고 어지럽게 됩니다.
  3. 아무 의미가 없음 : null은 아무 의미도 표현하지 않으며, 이는 값이 없음을 표현하기에 부적합 합니다.
  4. 자바 철학에 위배된다 : 자바의 경우, 개발자에게 모든 포인터를 숨겼지만 null은 유일하게 포인터를 숨길 수 없었습니다.
  5. 형식 시스템에 구멍을 만든다 : null은 무형식이며 정보를 포함하고 있지 않으므로 모든 참조 형식에 null을 할당 할 수 있고, 이는 결국 위험한 코드를 만들게 됩니다.

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

3. Optional 클래스 소개

 

자바 8 에서는 null 문제를 해결하기 위해 Optional<T> 라는 클래스를 제공합니다.

 

이는 사실상 단순히 null를 위한 Wrapper 클래스입니다.

 

아래는 Optional 를 간단히 보여주는 그림입니다.

 

 

 

모든 null 참조를 Optional로 대치하는것은 바람직하지 않습니다.

Optional은 객체에 대해서 null 체크, null 인 경우 대처를 어떻게 할지를 도와주는 역할입니다.

 

때문에, Optional를 사용한다고 NullPointerException이 나지 않는것은 아닙니다.

 

4. Optional 적용 패턴

 

1) Optional 객체 만들기

 

Optional 도 클래스이기 때문에 객체를 생성해서 사용해야 합니다.

 

1. 빈 Optional

 

아래와 같이 빈 Optional 객체를 생성할 수 있습니다.

 

Optional<Car> optCar = Optional.empty();

 

2. null이 아닌 값으로 Optional 만들기

 

아래와 같은 기존에 있는 객체로 Optional 객체를 만들 수도 있습니다.

 

Optional<Car> optCar = Optional.of(car);

 

단, 이 경우 인자인 car가 null인 경우 NullPointerException이 발생합니다.

 

 

3. null 값으로 Optional 만들기

 

아래와 같은 방법으로도 Optional 객체를 만들 수 있습니다.

 

Optional<Car> optCar = Optional.ofNullable(car);

 

2번의 of 메서드와의 차이점으로는 car가 null인 경우 NullPointerException 가 아닌 빈 Optional을 반환한다는 점입니다.

 

 

2) 맵으로 Optional의 값을 추출하고 변환하기

 

Optional 클래스는 스트림 메서드와 비슷한 map 메서드를 지원합니다.

 

아래와 같이 map 을 사용할 수 있습니다.

 

Optional<Insurance> optInsurance = Optional.ofNullable(insurance);
Optional<String> name = optInsurance.map(Insurance::getName);

 

만약, Optional이 비어있으면 아무일도 일어나지 않습니다.

 

 

3) flatMap으로 Optional 객체 연결

 

스트림 메서드의 flatMap 비슷한 기능의 메서드도 제공하고 있습니다.

 

Optional의 flatMap도 스트림 메서드와 동일하게 Optional<Optional<T>> 와 같은 depth 가 생기는 것을 Optional<T> 로 평준화 해주는 메서드입니다.

 

 

4) Optional로 자동차의 보험회사 이름 찾기

 

그럼 이제 배운 Optional를 사용하여 예제를 해결하게 되면 아래와 같은 코드가 나오게 됩니다.

 

public class Person {
    private Car car;
    public Optional<Car> getCarAsOptional() {
        return Optional.ofNullable(car);
    }
}

public class Car {
    private Insurance insurance;
    public Optional<Insurance> getInsuranceAsOptional() {
        return Optional.ofNullable(insurance);
    }
}

public class Insurance {
    private String name;
    public String getName() {
        return name;
    }
}

public String getCarInsuranceName(Person person) {
    return Optional.of(person)
            .flatMap(Person::getCarAsOptional)
            .flatMap(Car::getInsuranceAsOptional)
            .map(Insurance::getName)
            .orElse("other value");
}

 

Optional 클래스는 필드 형식으로 사용할 것을 가정하지 않아, Serializable 인터페이스를 구현하지 않습니다.
때문에, 위와 같이 get 메서드에만 Optional 를 사용하는것을 권장합니다.

 

5) Optional 스트림 조작

 

자바 9 에서는 Optional 의 스트림 처리를 제공하기 위해 Optional에 stream() 메서드를 제공합니다.

 

아래는 Optional의 stream 함수를 사용한 예제입니다.

 

public Set<String> getCarInsuranceNames(List<Person> persons) {
    return persons.stream()
            .map(Person::getCarAsOptional)
            .map(optCar -> optCar.flatMap(Car::getInsuranceAsOptional))
            .map(optIns -> optIns.map(Insurance::getName))
            .flatMap(Optional::stream)
            .collect(Collectors.toSet());
}

 

stream 메서드를 지원함으로 위와 같이 더욱 간단하게 null 처리를 할 수 있게 되었습니다.

 

하지만, 마지막 결과를 얻기 위해서는 빈 Optional 은 제거하고 있는것은 언랩해야 하는 문제가 있습니다.

 

이는 위 예제에서 Optional::stream 메서드로 해결할 수 있습니다.

stream 메서드는 값이 있는 것 만을 Stream에 담아서 전달하기 때문입니다.

 

 

6) 디폴트 액션과 Optional 언랩

 

아래는 Optional 클래스가 가지고 있는 디폴트 액션입니다.

 

디폴트 액션 설명
get Optional 의 값을 가져오는 메서드입니다.
만약, 값이 없다면 NoSuchElementException이 발생하기 때문에 위험한 메서드입니다.
orElse(T other) orElse는 값이 없는경우 인자인 other를 반환합니다.
orElseGet(Supplier<? extends T> other) orElseGet는 orElse의 게으른 버전입니다.
값이 없는경우에서야 Supplier 를 수행하여 값을 반환하기 때문입니다.
orElseThrow(Supplier<? extends T> exceptionSupplier) orElseThrow는 값이 없는 경우 예외를 발생합니다.
ifPresend(Consumer<? super T> consumer) ifPresend는 값이 존재할때만, 인자의 Consumer를 수행합니다.
ifPresendOrElse(Consumer<? super T> action, Runnable emptyAction) ifPresendOrElse는 자바 9에서 추가된 메서드로,
위의 ifPresent와의 차이점으로는 값이 비어있는 경우 Runnable 인자를 실행한다는 점입니다.

 

7) 필터로 특정값 거르기

 

Optional은 filter 메서드를 지원하고 있습니다.

 

이 메서드는 프레디케이트를 인자로 받으며, Optional 객체가 값을 가지고 있는 경우 프레디 케이트를 적용하고, 값이 없는경우에는 빈 Optional를 반환합니다.

프레디 케이트 적용 결과가 false의 경우에도 빈 Optional 을 반환합니다.

 

아래는 filter 사용 예제 입니다.

 

Optional<Insurance> optInsurance = Optional.ofNullable(insurance);
optInsurance.filter(insurance -> "CabridgeInsurance".equals(insurance.getName()))
            .ifPresent(x -> System.out.println("ok"));

 

6. 마무리

 

이번 포스팅에서는 Chapter11 null 대신 Optional 클래스에 대해 진행하였습니다.

다음에는 Chapter12 새로운 날짜와 시간 API에 대해 포스팅하겠습니다.

 

반응형
반응형

1. 서론

이번 포스팅에서는 앞 포스트에서 설치한 CDH와 개별 어플리케이션과의 연동 방법에 대해 알아 보도록하겠습니다.

 

2. 스파크 클라이언트

MR, Spark 모두 Job을 CDH 서버에서 제출하여 수행해도 됩니다.

 

하지만 일반적으로, CDH 서버군들은 YARN, HDFS, SPARK 등 처리와 관리를 위한 용도로만 프로세스를 띄우게 하고

별도 클라이언트 어플리케이션을 수행하는 서버군이 있습니다.

 

그리고, 클라이언트는 Job을 YARN에게 제출하여 CDH에서 작업을 처리하도록 한 후 결과를 받습니다.

 

아래는 간단한 클라이언트와 CDH 간의 그림입니다.

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3. CDH와 연동

 

클라이언트 서버에서 원하는 서비스들을 별도로 설치하여 사용할 수 있습니다.

 

하지만, 클라이언트에 CDH의 각 서비스 설정을 옮겨야 하는 번거로움이 있으며 실수할 수 있는 포인트가 생깁니다.

또한, 클라이언트와 CDH의 서비스 버전이 상이한것보다는 동일하게 운영 시 위험성이 낮습니다.

여기서 서비스라고 하는 것들은 Hdfs, Yarn, Spark 등등을 의미합니다.

 

저는 위와 같은 이유로 CDH 에서 한 서버를 선택하여 rsync를 통해 설정 정보를 모두 가져오도록 했습니다.

 

방법은 아래와 같습니다.

 

1) 터널링

 

먼저 CDH 중 서버 한개와 클라이언트 서버와 터널링을 진행합니다.

 

아래와 같이 ssh-keygen을 통해 클라이언트 서버에 public key를 생성합니다.

 

ssh-keygen
cd ~/.ssh
cat id_rsa.pub

 

 

cat으로 출력 된  public key를 CDH 서버 한대의 .ssh 디렉터리로 이동하여 authorized_keys 파일에 추가해줍니다.

 

이제 수동으로 클라이언트 서버에서 CDH 서버로 ssh 연결을 한번 수행 해줍니다.

 

ssh CDH-10-server

 

 

2) rync

 

CDH의 경우 /opt/cludera/parcels 로 설정 정보와 lib 등등 모든 정보를 놓습니다.

 

하지만, 루트 디렉터리에 용량이 꽉차 disk full 이 날 수 있으니 아래와 같이 큰 disk가 마운트 되어 있는 디렉터리로 심볼릭링크를 걸어 줍니다.

 

sudo mkdir -p /home1/irteamsu/opt/cloudera
sudo ln -s /home1/irteamsu/opt/cloudera /opt/cloudera

 

그리고 이제 rync를 통하여 데이터를 가져와 줍니다.

 

 

3) PATH 설정

 

다 가져오셨다면 이제 PATH 설정을 해주기 위해 아래와 같이 bash_profile 을 수정합니다.

 

vim ~/.bash_profile

 

.bash_profile 파일을 vim 에디터로 엽니다.

 

아래와 같이 설정을 추가합니다.

 

HADOOP_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554
SPARK_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/etc/hadoop
export PATH=$HADOOP_HOME/bin::$SPARK_HOME/bin:$PATH

 

CDH 버전이 다르다면 위 경로를 자신의 버전에 맞게 바꿔서 사용하시면 됩니다.

 

마지막으로 아래와 같이 source 명령어를 통해 .bash_profile 의 내용을 반영합니다.

 

source ~/.bash_profile

 

4) 확인

 

아래와 같이 hdfs 명령어와 spark-shell 명령어가 정상 동작하는것을 확인할 수 있습니다.

 

추가로 hdfs 명령어 시 CDH에 있는 hdfs 에 있는 디렉터리가 나오는것을 볼 수 있습니다.

 

 

위와 같은 방법으로 클라이언트를 만드는 것의 단점이 있습니다.

 

바로 rsync로 지정한 디렉터리에 있는 모든 설정을 다 가져온다는 것 입니다.

클라이언트 서버는 spark를 사용하지 않는데도 spark 설정이 들어오게 되는 것 처럼요.

 

또한, 클라이언트가 추가 될 때마다 터널링을 수행해야 한다는 점입니다.

 

이러한 문제점들은 사실상 파일 서버를 두어 설정 파일들을 중앙 관리하도록 한다면 크게 문제가 되지 않을 것입니다.

 

지금은 예제를 위해 간단히 rsync를 통해 가져오도록 해보았습니다.ㅎㅎ

 

4. 마무리

 

이렇게, Spark에 대한 포스팅을 완료했습니다.

 

다음 포스팅에서는 Hdfs기반 Nosql로 많이 사용하는 Hbase에 대해 진행하도록 하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(7) Apache Zeppelin  (0) 2020.04.20
(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
반응형

1. 서론

이번 포스팅에서는 Apache Zeppelin 대해 알아 보도록하겠습니다.

 

2. Apache Zeppelin 이란?

Apache Zeppelin이란 Spark를 Web 기반 NoteBook으로 간편히 수행할 수 있게 제공하는 어플리케이션입니다.

 

ide를 통해 할 수도 있지만 Web 기반으로 어디서든 접근하여 간편히 Spark 로직을 생산할 때 많이 사용하는 도구입니다.

 

3. 설치 및 CDH와 연동

CDH ( = Cloudera's Distribution for Hadoop )에서 사용하는 CM (= Cloudera Manager)에서는 안타깝게도 Apache Zeppelin 설치를 제공하고 있지 않습니다.

 

HDP ( = Hortonworks Data Platform ) 에서 사용하는 Ambari 에서는 지원하고 있습니다.
Ambari에서 설치 법은 아래 URL에 나와있습니다.
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/installing-zeppelin/content/installing_apache_zeppelin.html

 

때문에, 수동으로 설치하여 CDH와 연동하여 사용하여야 합니다.

 

 

1) 다운로드

 

저는 CDH 중 호스트 한개를 선택하여 설치를 진행하였습니다.

 

먼저 아래와 같이 zeppelin 을 다운받습니다.

( 현재 포스팅 시점에서는 zeppelin-0.9.0-preview1 가 최신 버전이지만 많이 사용하는 0.8.1 버전으로 진행하도록 하겠습니다. )

 

wget http://apache.mirror.cdnetworks.com/zeppelin/zeppelin-0.8.1/zeppelin-0.8.1-bin-all.tgz

 

다운받은 압축파일을 해제합니다.

 

tar -zxvf zeppelin-0.8.1-bin-all.tgz

 

Zeppelin의 경우 기본경로를 /opt/zeppelin으로 잡기 때문에 압축해제한 폴더를 해당 경로로 옮겨줍니다.

 

sudo mkdir -p /opt/zeppelin
sudo mv ./zeppelin-0.8.1-bin-all /opt/zeppelin

 

 

2) 설정 파일 수정

 

먼저 /opt/zeppelin/zeppelin-0.8.1-bin-all/conf 디렉터리로 이동합니다.

후에, 아래 3개 파일을 모두 cp 명령어를 통해 postfix .template를 제거한 파일을 만들어 줍니다.

 

cp shiro.ini.template shiro.ini
cp zeppelin-env.sh.template zeppelin-env.sh
cp zeppelin-env.cmd.template zeppelin-env.cmd

 

3개 파일을 모두 수정할건 아니지만 사용자가 필요시에 수정할 파일이기 때문에 미리 cp를 통해 만들어 주었습니다.

 

이제 zeppelin-env.sh 파일을 열어 아래와 같이 설정 해주시면 됩니다.

 

설정 값을 CDH의 parcels를 통해 설치한 경로로 잡아줘야 합니다.
제플린을 통해 스파크 코드가 CDH 노드들로 수행되기 위함입니다.

 

export JAVA_HOME="/usr/java/jdk1.8.0_181-cloudera"
export SPARK_HOME="/opt/cloudera/parcels/CDH/lib/spark"
export HADOOP_CONF_DIR="/etc/hadooc/conf"

 

기본 설정은 이걸로 끝입니다ㅎㅎㅎ

 

3) 실행

 

이제 bin 디렉터리인 /opt/zeppelin/zeppelin-0.8.1-bin-all/bin

으로 이동 후 zeppelin-daemon.sh start 명령어만 수행해주면 됩니다.

 

cd /opt/zeppelin/zeppelin-0.8.1-bin-all/bin
./zeppelin-daemon.sh start

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

4. 사용 예제

이제 위까지 진행하셨다면 기본 포트인 8080으로 접속하시면 아래와 같은 화면을 볼 수 있습니다.

 

 

1) 로그인하기

 

로그인은 앞에서 봤던 shiro.ini 파일에 각 인증, 권한을 허용하는 유저를 등록할 수 있습니다.

앞에서는 딱히 건들지 않았기 때문에 아래와 같은 기본 유저를 사용하시면 됩니다.

 

 

저는 첫번째인 user1 에 password2로 로그인하였습니다.

 

2) 노트 생성하기

 

로그인을 했으니 아래 사진의 create new note 를 클릭하여 추가합니다.

 

저는 feeder라는 디렉터리에 compare를 추가했습니다.

 

 

3) pyspark 코드 실행

 

생성한 노트에 들어가면 아래와 같은 화면이 나옵니다.

 

 

pyspark를 사용하기 위해 %spark.pyspark 를 맨 상단에 추가해주시면 됩니다.

 

아래는 간단하게 rdd를 만들어서 출력해보는 예제입니다.

 

 

5. 마무리

 

이번에는 Apache Zeppelin 에 대해서 설치와 간단한 사용 예제를 포스팅하였습니다.

다음에는 CDH와 스파크 클라이언트 연동에 대해 포스팅하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(8) CDH와 스파크 클라이언트 연동  (0) 2020.04.21
(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
반응형

1. 서론

 

이번 포스팅에서는 Hbase가 제공하는 클라이언트 API 중 고급 기능에 대해 알아보겠습니다.

 

2. 필터

Hbase에서는 Get 말고도 필터를 이용하여 데이터를 조회하여 가져올 수 있습니다.

 

1) 필터 소개

 

Hbase 클라이언트는 Filter라는 추상클래스와 여러가지 구현클래스를 제공하고 있습니다.

 

또한, 개발자는 직접 Filter 추상 클래스를 구현하여 사용할 수 있습니다.

 

모든 필터는 실제로 서버측에 적용이 되어 수행되어 집니다.

클라이언트에서 적용되는 경우에는 많은 데이터를 가져와 필터링해야 하기 때문에 대규모 환경에서는 적합하지 않습니다.

 

아래는 필터가 실제로 어떻게 적용되는지 보여줍니다.

 

 

클라이언트에서 필터 생성 -> RPC로 직렬화한 필터 서버로 전송 -> 서버에서 역직렬화하여 사용

 

 

2) 필터의 계층 구조

 

Filter 추상클래스가 최상단이며 추상클래스를 상속받아 뼈대를 제공하는 추상클래스로 FilterBase가 있습니다.

Hbase에서 제공하는 구현 클래스들은 FilterBase를 상속하고 있는 형태입니다.

 

 

3) 비교 필터

 

Hbase가 제공하는 필터 중 비교연산을 지원하는 CompareFilter가 있습니다.

 

생성자 형식은 아래와 같습니다.

 

CompareFilter(final CompareOperator op, final ByteArrayComparable comparator)

 

 

CompareOperator 에는 [LESS, LESS_OR_EQUAL, EQUAL, NOT_EQUAL, GREATER_OR_EQUAL, GREATER, NO_OP] 가 있습니다.

ByteArrayComparable 는 추상 클래스로 compareTo 추상 메서드를 가지고 있습니다.

 

 

4) 로우 필터 - RowFilter

 

로우 필터는 로우 키 기반으로 데이터를 필터링 할 수 있도록 제공하고 있습니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-0"));

Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
ResultScanner scanner1 = hTable.getScanner(scan);
scanner1.forEach(System.out::println);
scanner1.close();

Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*-.5"));
scan.setFilter(filter2);
ResultScanner scanner2 = hTable.getScanner(scan);
scanner2.forEach(System.out::println);
scanner2.close();

Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("-5"));
scan.setFilter(filter3);
ResultScanner scanner3 = hTable.getScanner(scan);
scanner3.forEach(System.out::println);
scanner3.close();

 

위 예제에서는 아래 필터들을 사용하는 것을 볼 수 있습니다.

 

  • filter1 : 지정한 로우키에 대해서 사전편찬식으로 저장되는 로우들을 이용하여 필터
  • filter2 : 정규표현식을 이용하여 필터
  • filter3 : 부분 문자열을 이용하여 필터.

 

5) 패밀리 필터 - FamilyFilter

 

패밀리 필터의 경우 로우 필터와 동작방식이 비슷하지만 로우 키가 아닌 로우 안의 컬럼패밀리를 대상으로 비교합니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));

Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(System.out::println);
scanner.close();

Get get1 = new Get(Bytes.toBytes("row-5"));
get1.setFilter(filter1);
Result result1 = hTable.get(get1);
System.out.println("Result of get(): " + result1);

Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("colfam3")));
Get get2 = new Get(Bytes.toBytes("row-5"));
get2.addFamily(Bytes.toBytes("colfam1"));
get2.setFilter(filter2);
Result result2 = hTable.get(get2);
System.out.println("Result of get(): " + result2);

 

위 예제 볼 수 있듯이 Filter는 Get, Scan 두개에 setFilter 메서드를 통해 적용 가능합니다.

 

또한, 컬럼 패밀리도 사전 편찬식으로 저장되는것을 이용하는것을 볼 수 있습니다.

 

 

6) 퀄리파이어 필터 - QualifierFilter

 

퀄리파이어 필터는 말 그대로 퀄리파이어를 대상으로 비교하는 필터입니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
        
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(System.out::println);
scanner.close();
        
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = hTable.get(get);
System.out.println("Result of get() : " + result);

 

7) 값 필터

 

이번에는 값을 대상으로 비교하는 필터입니다.

 

아래는 예제입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4"));
        
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(result -> {
    Arrays.asList(result.rawCells()).forEach(System.out::println);
});
scanner.close();
        
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = hTable.get(get);
Arrays.asList(result.rawCells()).forEach(System.out::println);

 

 

8) 의존 컬럼 필터 - DependentColumnFilter

 

의존 컬럼 필터의 경우 단순 필터링이 아닌 더 복잡한 필터 기능을 제공합니다.

이 필터는 다른 컬럼이 필터링될지 여부를 결정하는 의존 컬럼을 지정합니다.

 

아래는 의존 컬럼 필터의 생성자 입니다.

 

DependentColumnFilter(
    final byte [] family, 
    final byte[] qualifier,
    final boolean dropDependentColumn, 
    final CompareOperator op,
    final ByteArrayComparable valueComparator
)

 

9) 단일 컬럼값 필터 - SingleColumnValueFilter

 

단일 컬럼값 필터는 특정 컬럼과 값에 대해 비교 필터로 사용합니다.

 

생성자는 아래와 같습니다.

 

SingleColumnValueFilter(
  final byte [] family, 
  final byte [] qualifier,
  final CompareOperator op, 
  final byte[] value
)

SingleColumnValueFilter(
  final byte [] family, 
  final byte [] qualifier,
  final CompareOperator op,
  final ByteArrayComparable comparator
)

 

또한 추가로 아래와 같은 메소드를 통해 미세 조정이 가능합니다.

 

boolean getFilterIfMissing()
void setFilterIfMissing(boolean filterIfMissing)
boolean getLatestVersionOnly()
void setLatestVersionOnly(boolean latestVersionOnly)

 

10) 단일 컬럼값 제외 필터 - SingleColumnValueExcludeFilter

 

이 필터는 위의 단일 컬럼값 필터의 반대 기능의 필터로 보시면 됩니다.

 

즉, 생성자에 전달한 참조 컬럼이 결과값에서 제외 되어집니다.

 

 

11) 접두어 필터 = PrefixFilter

 

접두어 필터의 경우 이름 그대로 인스턴스화 할때 지정한 접두어와 일치하는 모든 로우를 반환합니다.

 

아래는 생성자 코드입니다.

 

PrefixFilter(final byte [] prefix)

 

이 필터의 경우 scan에 setting하여 사용할때 유의미하며, 지정한 접두어보다 큰 로우키를 만났을때는 알아서 종료되어

불필요한 탐색작업이 일어나지 않도록 되어 있습니다.

 

이 또한 Hbase의 사전 편찬식으로 정렬되어 있기 때문에 가능한 동작입니다.

 

 

12) 페이지 필터 - PageFilter

 

이 필터를 이용하면 로우 단위로 페이징 기능을 제공합니다.

 

인스턴스를 생성할 때 pageSize 파라미터를 지정하여 페이지당 몇 개의 로우를 반환할지 지정할 수 있습니다.

 

클라이언트 코드에서는 반환 받은 마지막 로우를 기억하고 있다가, 다음 이터레이션을 시작할때 시작 로우로 설정하여 사용할 수 있습니다.

 

이 필터는 시작 로우를 포함하므로 다음 페이징을 할때는 마지막 로우에 0byte를 추가하여 시작 로우로 설정하면 됩니다.

0byte는 증가시킬수 있는 최소한의 값이므로 안전하게 스캔 범위를 재설정할 수 있습니다.
또한 0byte 추가된 로우가 실제로 있더라도 페이지 필터는 시작 로우를 포함하므로 문제가 되지 않습니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));
        
Filter filte = new PageFilter(15);
final byte[] POSTFIX = Bytes.toBytes(0);
        
int totalRows = 0;
byte[] lastRow = null;
while(true) {
    Scan scan = new Scan();
    scan.setFilter(filte);
    
    if(lastRow != null) {
        byte[] startRow = Bytes.add(lastRow, POSTFIX);
        System.out.println("start row : " + Bytes.toStringBinary(startRow));
        scan.setStartRow(startRow);
    }

    ResultScanner scanner = hTable.getScanner(scan);
    int localRows = 0;
    Result result;
    while((result = scanner.next()) != null) {
        System.out.println(localRows++ + ":" + result);
        totalRows++;
        lastRow = result.getRow();
    }
    scanner.close();
    
    if(localRows == 0) break;
}
System.out.println("total rows : " + totalRows);

 

책에는 나와 있지 않지만, 물리주소가 아닌 로우키를 기준으로 페이징을 하여 시점에 따라 페이징 결과값은 달라질 수 있을 것으로 예상되어 집니다.
실제 서비스에서는 이 부분을 고려하여  적용해야 합니다.

 

13) 키 전용 필터 

 

이 필터는 데이터의 키 정보에만 접근하고 값 정보는 사용하지 않는 경우 사용합니다.

 

 

14) 최초 키 전용 필터

 

각 로우에서 Hbase 내부적으로 정렬된 첫번째 컬럼에 접근하는 기능을 제공합니다.

 

이러한 필터는 대체적으로 로우의 갯수를 셀 때 사용하곤 합니다.

 

 

15) 종료 로우 포함 필터 - InclusiveStopFilter

 

보통 필터들의 경우 시작 로우는 포함하나 종료 로우는 포함하지 않습니다.

하지만, 종료로우도 포함하기를 원할때는 이 필터를 사용하면 됩니다.

 

 

16) 타임스탬프 필터 - TimestampsFilter

 

스캔 결과에 버전 단위까지 미세조정하려면 타임스탬프 필터를 사용하면 됩니다.

 

아래는 생성자 코드입니다.

 

TimestampsFilter(List<Long> timestamps)

 

인자로 타임스탬프 리스트를 받는것을 볼 수 있습니다.

 

이 리스트에 포함된 타임스탬프와 동일한 결과만 반환받게 됩니다.

또한 scan의 경우 자체적으로 setTimeRange 메서드를 통해 범위를 지정할 수 있는데 타임스탬프 필터도 적용하게 되면

지정한 범위 안에서 필터에 지정한 버전과 동일한 값들만을 반환하게 됩니다.

 

 

17) 컬럼 개수 제한 필터 - ColumnCountGetFilter

 

로우 당 지정한 최대 개수만큼의 컬럼만 반환받는 필터입니다.

 

생성자는 아래와 같습니다.

 

ColumnCountGetFilter(final int n)

 

로우에서 설정된 최대 갯수만큼의 컬럼이 발견되면 전체 스캔을 중단하기 때문에 그다지 유용한 필터는 아닙니다.

 

 

18) 컬럼 페이지 필터 - ColumnPaginationFilter

 

페이지 필터와 비슷하지만, 이 필터는 한 로우 안의 컬럼을 대상으로 페이징을 제공합니다.

 

아래는 생성자 입니다.

 

ColumnPaginationFilter(final int limit, final int offset)

 

 

19) 컬럼 접두어 필터 - ColumnPrefixFilter

 

이 필터는 컬럼을 대상으로 지정한 접두어가 있는 값들을 필터링합니다.

 

생성자는 아래와 같습니다.

 

ColumnPrefixFilter(final byte [] prefix)

 

20) 스킵 필터 - SkipFilter

 

스킵 필터는 보조 필터로서 다른 필터를 감싼 형태로 동작합니다.

감싸진 필터가 건너뛸 인스턴스에 대한 단서를 제공하면 그 전체 로우를 제외하게 됩니다.

 

 

21) 스캔 중단 필터 - WhileMatchFilter

 

스캔 중단 필터도 보조 필터로서 하나라도 필터링되는 순간 전체 스캔을 중단하도록 합니다.

 

 

22) 필터 리스트 - FilterList

 

위에서 살펴본 필터들을 중첩하여 사용하고 싶을 수 있습니다.

이를 위해, 사용하는것이 필터 리스트입니다.

 

생성자는 아래와 같습니다.

 

FilterList(final List<Filter> filters)
FilterList(final Operator operator)
FilterList(final Operator operator, final List<Filter> filters)

 

filters에는 적용할 필터의 리스트를 의미합니다.

operator는 필터 결과를 어떻게 만들지 지정합니다.

 

아래는 Operator 값 입니다.

 

연산자 설명
MUST_PASS_ALL 모든 필터를 통과한 값만이 결과에 추가 ( = AND)
MUST_PASS_ONE 필터 중 하나라도 통과 한 값은 결과에 추가 ( = OR)

 

생성자 말고도 필터를 추가해야 할때는 아래 메소드를 사용하면 됩니다.

 

void addFilter(Filter filter)
void addFilter(List<Filter> filters)

 

또한, List 의 구현체에 따라 필터의 순서를 정할 수 있습니다.

ArrayList의 경우 담긴 순서대로 필터가 적용되는것을 보장합니다.

 

 

23) 사용자 정의 필터

 

지금까지는 Hbase 클라이언트가 제공하는 필터 종류에 대해 알아보았습니다.

 

하지만, 제공하는 필터가 아닌 사용자가 만든 Custom 한 필터가 필요한 경우가 있습니다.

 

이런경우에는 Filter 혹은 FilterBase 추상 클래스를 상속받아 만들 수 있습니다.

 

 

 

 

 

반응형

 

 

 

 

3. 카운터

Hbase에서는 고급 기능인 카운터 기능도 제공하고 있습니다.

 

1) 카운터 소개

 

카운터 기능이 없다면 개발자는 수동으로 아래와 같은 절차를 만들어야 합니다.

 

  1. 로우 lock
  2. 값 조회
  3. 값 증가 후 write
  4. lock 해제

 

하지만 이러한  절차는 과도한 경합상황을 야기하며,

lock이 잠긴상태로 어플리케이션이 죽게되면 lock 정책에 따른 타임아웃이 끝나기 전까지는 다른 어플리케이션인 접근할 수 없게 됩니다.

 

Hbase에서는 클라이언트 측 호출을 통해서 이러한 절차를 원자적으로 처리할 수 있도록 제공하고 있습니다.

더불어, 한 호출로 여러개의 카운터 갱신 기능도 제공하고 있습니다.

 

아래는 hbase shell 에서 카운터 예제를 수행한 사진입니다.

 

 

incr 은 주어진 인자 만큼 카운터의 값을 증가 시킨 후 반환 받습니다.

get_counter는 현재 카운터 값을 반환합니다.

 

아래는 카운터도 보통의 컬럼 중에 하나라는것을 증명하는 사진입니다.

 

 

추가로 incr의 경우 음수를 인자로 주어 카운터를 감소시킬수도 있습니다.

 

 

2) 단일 카운터

 

Hbase 클라이언트는 단일 카운터만을 대상으로 처리할 수 있도록 제공하고 있습니다.

이때, 정확한 카운터 컬럼을 지정해야 합니다.

 

아래는 HTable 클래스에서 제공하는 메서드입니다.

 

long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)

 

카운터를 수행할 좌표값(row, family, qualifier) 와 카운터 증감 값을 인자로 받는것을 알 수 있습니다.

단, Durability 인자로 오버로딩이 되어 있습니다.

이 Durability 는 WAL에 반영을 어떻게 할건지에 대한 설정입니다.

 

Durability 는 enum으로 아래와 같은 값들이 정의되어 있습니다.

 

public enum Durability {
  USE_DEFAULT,
  SKIP_WAL,
  ASYNC_WAL,
  SYNC_WAL,
  FSYNC_WAL
}

 

 

3) 복수 카운터

 

카운터를 증가시키는 방법으로는 HTable의 increment 메서드가 있습니다.

 

아래는 increment 메서드 명세입니다.

 

Result increment(final Increment increment)

 

이 메서드를 사용하기 위해서는 Increment 인스턴스를 만들어야 합니다.

Increment 인스턴스에는 카운터 컬럼의 좌표값과 적절한 세부 사항을 넣어야 합니다.

 

아래는 Increment의 생성자입니다.

 

Increment(byte [] row)

 

생성자로 로우키를 먼저 필수로 지정한 뒤 범위를 줄이고 싶을때는 아래와 같은 메서드로 가능합니다.

 

Increment addColumn(byte [] family, byte [] qualifier, long amount)

 

카운터의 경우 버전은 내부적으로 처리하기 때문에 인자에 timestamp를 받는 부분이 없습니다.

또한, addFamily 메서드도 없는데 그 이유는 카운터는 특정 컬럼이기 때문입니다.

 

추가로 시간 범위를 지정하여 읽기 연산의 이점을 얻는 메서드도 제공하고 있습니다.

 

Increment setTimeRange(long minStamp, long maxStamp)

 

아래는 다중 컬럼의 카운터 연산을 하는 예제 입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Increment increment1 = new Increment(Bytes.toBytes("young!!"));
increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 1);
increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 10);
increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), 10);

Result result1 = hTable.increment(increment1);
result1.listCells().forEach(System.out::println);

Increment increment2 = new Increment(Bytes.toBytes("young!!"));
increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 5);
increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 0);
increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), -5);

Result result2 = hTable.increment(increment2);
result2.listCells().forEach(System.out::println);

 

Increment를 통해서 복수개의 컬럼에 카운터를 처리하는 것을 볼 수 있습니다.

 

4. 보조 처리기

Hbase에서는 보조 처리기라는 것을 이용하여 계산 작업의 일부를 리전 서버에 전가시키는 방법을 제공하고 있습니다.

앞절까지는 조회하는 데이터를 제한한 후 클라이언트에서 비즈니스 로직을 통해 계산했었습니다.

 

1) 보조 처리기 소개

 

Hbase에서 제공하는 보조 처리기는 임의의 코드를 각 리전 서버에서 직접 실행하게 해줍니다.

 

이 보조처리기의 경우 필터처럼 인터페이스를 구현하여 사용자 정의 보조 처리기를 만들 수 있습니다.

구현한 코드는 컴파일하여 jar 형태로 Hbase에 전달하면 됩니다.

이 보조 처리기는 필터와는 달리 동적으로 로드할 수 있어 Hbase ㅋ,ㄹ러스터의 기능성을 쉽게 확장할 수 있습니다.

 

보조 처리기를 구현하여 사용할 시 아래 두 클래스를 사용하면 됩니다.

 

1. 옵저버

 

특정 이벤트가 발생 시 콜백메서드를 수행하는 클래스입니다.

 

Hbase에서 제공하는 옵저버 인터페이스 종류는 아래와 같습니다.

org.apache.hbase:hbase-endpoint 를 추가해야 합니다.

 

  • RegionObserver = 데이터 조작 이벤트를 처리하는 옵저버, 테이블이 위치한 리전과 밀접하게 연관
  • MasterObserver = 관리 또는 DDL 유형의 동작에 반응하는 옵저버, 클러스터 전반에 걸친 이벤트를 처리
  • WALObserver = WAL 처리에 대한 콜백을 제공합니다.

 

2. 엔드포인트

 

엔드포인트는 RPC를 동적으로 확장하여 콜백 원격 절차를 추가한 것으로, RDBMS에서 프로시져로 이해하면 됩니다.

 

이 엔드포인트는 옵저버와 결합하여 사용할 수 있으며 모두 Coprocessor 인터페이스를 상속하고 있습니다.

 

 

2) Coprocessor 인터페이스

 

모든 보조 처리기는 이 인터페이스를 구현해야 합니다.

 

이 인터페이스에는 enum으로 Pritority, State를 제공하고 있습니다.

 

아래는 Pritority 값에 대한 설명입니다.

 

설명
SYSTEM 우선 순위가 가장 높으며 가장 먼저 실행되어야 하는 보조 처리기
USER SYSTEM 우선순위 값을 가진 보조처리기가 실행된 다음에 실행

 

보조 처리기는 각자 생명주기가 있고, 프레임워크에서 관리하게 됩니다.

 

Coprocessor 는 생명주기에 대해 상요자 정의를 제공하기 위해 아래와 같은 두 메서드를 제공합니다.

 

void start(CoprocessorEnvironment env) throws IOException
void stop(CoprocessorEnvironment env) throws IOException

 

이 메서드들은 보조처리기가 시작할때와 끝날때 호출되어집니다.

인자로 받는 CoprocessorEnvironment는 인스턴스의 생명주기 전체에 걸쳐 상태를 저장하는데 사용되어 집니다.

 

이제 두번째 enum State는 바로 이런 보조처리기의 생명주기의 상태값을 의미합니다.

 

설명
UNINSTALLED 보조 처리기가 최초 상태에 있음. 아직 환경을 갖지 않았고 초기화 되지 않음.
INSTALLED 인스턴스가 환경안에 설치되었음
STARTING 보조 처리기 시작 직전 상태, 즉 보조 치리기의 start 메서드가 실행되기 직전 상태 
ACTIVE start 메서드 호출에 대한 응답이 반환된 상태
STOPPING stop 메서드가 실행되기 직전 상태
STOPPED stop 메서드가 호출에 대한 응답이 반환된 상태

 

마지막으로 CoprocessorHost 클래스가 있습니다.

이 클래스는 보조 처리기의 호스트가 어디에서 사용되는지에 따라 하위 클래스가 구분됩니다.

호스트의 경우 마스터 서버, 리전 서버를 의미합니다.

 

아래는 클라이언트에서 요청한 연산이 어떻게 보조처리기에 적용되는지를 보여주는 그림입니다.

 

 

 

3) 보조 처리기 로드

 

보조 처리기는 정적, 동적 모두 로드 되도록 할 수 있습니다.

 

1. 설정 파일에 의한 로드

 

Hbase가 시작할 때 로드할 보조 처리기를 전역적으로 설정 할 수 있습니다.

 

방법은 아래와 같이 hbase-site.xml 을 수정하면 됩니다.

 

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value>
</property>
<property>
    <name>hbase.coprocessor.master.classes</name>
    <value>coprocessor.MasterObserverExalple</value>
</property>
<property>
    <name>hbase.coprocessor.wal.classes</name>
    <value>coprocessorWALObserverExample, bar.foo.MyWALObserver</value>
</property>

 

각 속성에 있는 처리기 순서가 실행 순서가 됩니다.

또한, 여기에 정의된 보조처리기는 모두 시스템 수준 우선순위를 가지고 로드 됩니다.

 

 

2. 테이블 지시자에 의한 로드

 

테이블 지시자로도 로드할 수 있습니다.

테이블 지시자의 경우 테이블 단위로 할당 되는것으로 이곳에 정의된 보조 처리기는 해당 테이블이 속한 리전 및 리전 서버에서만 로드됩니다.

 

리전에서만 로드 되는것으로 Master, WAL 처리기는 사용할 수 없습니다.

 

테이블 지시자는 setValue를 통해 아래와 같이 보조 처리기의 정의를 추가해야 합니다.

key는 COPROCESSOR로 시작해야하며, value는 <jar_파일_경로>|<클래스_이름>|<우선순위> 의 포맷이여야 합니다.

 

jar 파일 경로의 경우 hdfs의 경로를 사용할 수 있습니다.

 

아래는 쉘을 통해 처리기를 추가하는 예제입니다.

'COPROCESSOR$1' => '/Users/laura/test2.jar|coprocessor.AnotherTest|USER'

 

마지막 우선순위에는 위에서 언급한 SYSTEM, USER 중에 하나를 지정합니다.

 

위에서 $를 사용하여 처리기가 로드되는 순서를 지정할 수 있습니다.

 

 

4) RegionObserver 클래스

 

RegionObserver 클래스는 리전 수준에서 사용되는 클래스로서, 리전 안에서 발생하는 특정 동작에 따라 발동하는 hook 이 있습니다.

 

hook을 발동시키는 동작은 두개로 아래와 같이 나눌 수 있습니다.

 

  • 리전 생명 주기 변경
  • 클라이언트 API 호출

1. 리전 생명 주기 변경

 

옵저버는 아래 그림과 같이 [열리기 전, 열림, 닫히기 전] 상태 변경에 반응할 수 있습니다.

 

 

열리기 전 상태

 

리전이 열리기 직전에 이 상태가 됩니다.

옵저버는 아래 메서드를 통해 리전을 열기 직전과 열린 직후에 프로세스에 영향을 줄 수 있습니다.

 

void preOpen()
void postOpen()

 

추가로 리전이 열리기 전 상태가 지나고 열림 상태 직전에 WAL에 있는 정보를 사용할 수도 있습니다.

이런경우를 위해 아래와 같은 메서드 hook 이 있습니다.

 

void preWALRestore()
void postWALRestore()

 

열림 상태

 

리전이 리전 서버에 배치되고 완전히 동작 가능한 상태입니다.

 

아래는 각 flush, compact, split 에 대한 동작에 대한 hook을 제공하는 메서드입니다.

 

void preFlush()
void postFlush()
void preCompact()
void postCompact()
void preSplit()
void postSplit()

 

닫히기 전 상태

 

리전 닫기가 임박한 상태입니다.

 

아래와 같은 메서드로 리전이 닫히기 직전과 직후에 대해서 핸들링할 수 있습니다.

 

void preClose(..., boolean abortRequested)
void postClose(..., boolean abortRequested)

 

abortRequested 인자는 리전이 닫히는 이유입니다.

일반적으로는 로드밸런싱을 위해 리전이 분할할 때 닫히게 되지만 일부 오작동으로 인해 닫히는 경우가 있기 때문입니다.

 

 

2. 클라이언트 API 이벤트 처리

 

클라이언트 API는 모두 명시적으로 리전 서버로 전달됩니다.

보조 처리기는 이 API 메서드가 실행되기 직전과 직후에 대한 hook을 제공합니다.

 

아래는 제공 hook 입니다.

 

  • void preGet / void postGet
  • void prePut /  void postPut
  • void preDelete / void postDelete
  • boolean preCheckAndPut / boolean postCheckAndPut
  • boolean preCheckAndDelete / boolean postCheckAndDelete
  • void preGetClosestRowBefore / void postGetClosestRowBefore
  • boolean preExists / boolean postExists
  • long preIncrementColumnValue / long postIncrementColumnValue
  • void preIncrement / void postIncrement
  • InternalScanner preScannerOpen / InternalScanner postScannerOpen
  • boolean preScannerNext / boolean postScannerNext
  • void preScannerClose / void postScannerClose

 

3. ResionCoprocessorEnvironment 클래스

 

RegionObserver 인터페이스를 구현하는 인스턴스는 RegionCoprocessorEnvironment 클래스를 상속하게 됩니다.

 

RegionCoprocessorEnvironment  클래스는 이름 그대로 리전에 환경 정보를 담당하는 클래스로 아래와 같은 메서드가 지원됩니다.

 

  • getRegion() = 현재 옵저버가 연관된 리전에 대한 참조 반환
  • getRegionserverServices() = 공유자원인 RegionServerServices 인스턴스에 대한 접근 제공

 

4. ObserverContext 클래스

 

ObserverContext는 옵저버 인스턴스의 현재 환경에 대한 접근을 제공하며,

콜백 메서드의 수행이 완료한 뒤에 보조 처리기 프레임워크가 무엇을 할지 지정할 수 있는 기능을 제공합니다.

 

아래는 제공하는 기능 중 중요한 두가지입니다.

 

  • bypass = 보조 처리기의 연쇄적 실행 흐름에 영향 -> 다음 보조 처리기를 타지 않게 합니다.
  • complete = 서버 측 프로세스를 중지

 

5) MasterObserver 클래스

 

MasterObserver 클래스는 마스터 서버가 호출 할 수 있는 콜백 메서드를 처리하는 기능을 제공합니다.

 

아래는 MasterObserver hook 종류입니다.

 

  • void preCreateTable / void postCreateTable
  • void preDeleteTable / void postDeleteTable
  • void preModifyTable / void postModifyTable 
  • void preAddColumn / void postAddColumn
  • void preModifyColumn / void postModifyColumn
  • void preDeleteColumn / void postDeleteColumn
  • void preEnableTable / void postEnableTable
  • void preDisableTable / void postDisableTable
  • void preMove / void postMove
  • void preAssign / void postAssign
  • void preUnassign / void postUnassign
  • void preBalance / void postBalance
  • boolean preBalanceSwitch / void postBalanceSwitch
  • void preShutdown
  • void preStopMaster

 

6) 엔드포인트 보조 처리기

 

하나의 리전에서만이 아닌 모든 리전에서 각자 어떠한 동작을 수행 후 취합하기를 원하는 경우가 있습니다.

하지만 지금까지 알아본 가능으로는 위와같이 할 수 없습니다.

할 수 있더라도 아마 모든 테이블을 스캔하는 동작이기에 성능상 안좋습니다.

 

HBase에서는 이러한 문제를 위해 엔드포인트 보조 처리기를 제공합니다.

 

1. CoprocessorProtocol

 

클라이언트에게 사용자 정의 RPC 프로토콜을 제공하려면 CoprocessorProtocol을 상속하는 인터페이스를 정의해야 합니다.

 

이 프로토콜을 사용하면 HTable이 제공하는 아래 메서드를 통해 보조 처리기 인스턴스와 통신할 수 있습니다.

 

<T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row)
<T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
<T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Call<T, R> callback)

 

위의 메서드에서 알 수 있듯이 CoprocessorProtocol 인스턴스는 리전과 연동되기 때문에, 클라이언트에서는 미리 어떤 리전에서 실행되어야 하는지 알아야하는 단점이 있습니다.

 

 

2. BaseEndPointCoprocessor 클래스

 

엔드포인트 보조 처리기를 구현하기 위해서는 위의 CoprocessorProtocol 뿐만이 아니라 BaseEndPointCoprocessor 클래스도 확장해야 합니다.

 

CoprocessorProtocol는 클라이언트와 서버의 RPC 프로토콜을 정의한거라면 BaseEndPointCoprocessor 는 실제 처리를 정의하는 구현체입니다.

 

아래는 엔드포인트 보조 처리기를 통해 호출되는 과정을 그림으로 나타낸 것입니다.

 

 

 

출처: 라스조지 [Hbase 완벽가이드] 한빛미디어 2013년 297p

 

5. HTablePool

HTable을 계속 생성하는것은 큰 오버헤드를 줄 수 있습니다.

그 이유는 HTable 인스턴스를 생성하는데 비용이 생각보다 크기 때문입니다.

 

HTable을 여러 쓰레드가 공유해서 쓰는것도 불가능 합니다. 이유는 HTable은 thread-safe 하지 않기 때문입니다.

 

때문에 HTablePool 클래스를 통해 해결해야 합니다.

 

HTablePool 클래스는 오직 클라이언트 API 인스턴스를 풀링하는 목적으로 만들어 졌으며,

생성자 명세는 아래와 같습니다.

 

HTablePool()
HTablePool(Configuration config, int maxSize)
HTablePool(Configuration config, int maxSize, HTableInterfaceFactoey tableFactory)

 

maxSize는 관리할 인스턴스 수 이고 tableFactory는 인스턴스를 생성할 팩토리 클래스입니다.

 

HTablePool은 테이블 단위로 풀을 관리하며 사용시에는 아래와 같은 메서드를 이용하면 됩니다.

 

HTableInterface getTable(String tableName)
HTableInterface getTable(byte[] tableName)
void putTable(HTableInterface table)

 

 

아래는 풀을 닫을 때 사용하는 메서드입니다.

 

void closeTablePool(String tableName)
void closeTablePool(byte[] tableName)

 

 

6. 연결처리

HBase 클라이언트는 인스턴스 생성 시 리전 정보를 가져오기 위해 주키퍼와 연결을 맺고, 클라이언트 쪽에 캐싱을 해두고 사용합니다.

내부적으로 캐싱된 정보로 요청시 리전을 못찾는 오류의 경우에는 주키퍼에 다시 한번 콜하여 캐시정보를 갱신합니다.

 

주키퍼에 대한 연결은 사용자가 관리해야하며 무작위로 연결을 증가하다보면, 가능한 연결 갯수를 넘어 IOException이 발생 할 수 있습니다.

 

때문에, 사용자는 작업이 끝나면 HTable의 close메서드를 호출해야합니다.

 

7. 마무리

 

이번 포스팅에서는 클라이언트 API : 고급 기능에 대해 진행하였습니다.

 

다음 포스팅에서는 챕터 5장인 클라이언트 API : 관리 기능에 대해 진행하겠습니다.

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(2) 설치  (0) 2020.04.08
(1) 소개  (0) 2020.04.07
반응형

1. 서론

이번 포스팅에서는 Chapter10의 람다를 이용한 도메인 전용 언어에 대해 진행하도록 하겠습니다.

 

2. 도메인 전용 언어

도메인 전용 언어 ( = Domain Specific Languages )는 특정 비즈니스 도메인의 문제를 해결하기 위해 만든 언어입니다.

간단히, 특정 비즈니스 도메인을 인터페이스로 만든 API라고 말할 수 있습니다.

 

아래 2가지를 생각하면서 DSL을 개발해야 합니다.

 

  • 코드의 의도가 개발자가 아니더라도 이해할 수 있도록 해야합니다.
  • 가독성을 높여 유지보수를 쉽게 할 수 있도록 해야합니다.

 

1) DSL의 장점과 단점

 

DSL은 비즈니스 의도와 가독성 측면에서는 좋지만, 해당 코드가 올바른지 검증과 유지보수를 해야하는 책임이 잇따르게 됩니다.

 

때문에, 장점과 단점이 있습니다.

 

아래는 DSL의 장점입니다.

 

  • 간결함 : 비즈니스 로직을 캡슐화하여 반복을 피하고 간결해집니다.
  • 가독성 : 도메인 영역의 용어를 사용하므로 비 도메인 전문가도 코드를 이해 할 수 있습니다.
  • 유지보수 : 간결함과 가독성으로 인해 어플리케이션의 유지보수가 좋습니다.
  • 높은 수준의 추상화 : 도메인과 같은 추상화 수준에서 동작하므로 도메인의 문제와 직접적으로 관련되지 않은 세부사항을 숨깁니다.
  • 집중 : 비즈니스 도메인을 표현하기 위한 언어이므로 특정 코드에 집중할 수 있습니다.
  • 관심사 분리 : 인프라구조 관련된 문제와 독립적으로 비즈니스 관련된 코드가 분리되어 집니다.

 

아래는 DSL의 단점입니다.

 

  • DSL 설계의 어려움 : 제한적인 언어에 도메인 지식을 담는것이 쉬운 작업은 아닙니다.
  • 개발 비용 : DSL을 프로젝트에 추가하는 것은 많은 비용과 시간이 소모됩니다.
  • 추가 우회 계층 : DSL 은 새로운 계층으로 기존 조메인 모델을 감싸는 형태가 되며, 계층을 최대한 작게 만들어 성능문제가 발생할 수 있습니다.
  • 새로 배워야 하는 언어 : DSL도 결국에는 언어이기 때문에, 해당 언어를 배워야합니다.
  • 호스팅 언어 한계 : 자바와 같이 엄격한 문법을 가진 언어는 사용자 친화적 DSL을 만드는데 한계가 있습니다.

 

2) JVM에서 이용할 수 있는 다른 DSL 해결책

 

DSL은 내부 DSL외부 DSL 이라는 카테고리로 나눌 수 있습니다.

 

  • 내부 DSL : 순수 자바코드 같은 기존 호스팅 언어를 기반으로 구현한 언어
  • 외부 DSL : 호스팅 언어와는 독립적으로 자체의 문법을 가지는 언어 (ex : SQL)

추가로 최근 자바는 아니지만 스칼라, 코틀린과 같이 JVM에서 실행되는 언어들이 나옴으로 인해 다중 DSL이라는 카테고리가 추가 되었습니다.

 

 

1.  내부 DSL

 

엄격한 문법을 가진 자바의 경우 DSL을 구현하는데 한계가 있었습니다.

 

하지만, 자바 8에서 나온 람다, 메서드 참조 등을 이용해 한계는 어느정도 해결할 수 있게 되었습니다.

 

자바를 통해 DSL을 구현함으로 얻는 장점은 아래와 같습니다.

 

  1. 새로운 언어와 기술을 배울 노력이 줄어듭니다.
  2. 다른 언어가 아닌 자바로 DSL을 구현하게 되면, 나머지 코드와 같이 컴파일이 가능해집니다.
  3. 자바를 사용하는 IDE의 혜택을 누릴수 있습니다.
  4. 자바는 유지보수에는 좋기 때문에 향후 도메인 변경 이슈에 대해 용이합니다.

 

2. 다중 DSL

 

JVM에서 동작하는 다양한 언어가 있어, 자바 어플리케이션이지만 DSL은 스칼라같은 언어로 대체가 되어 유연해질 수 있습니다.

 

스칼라가 자바에 비해 함수형 언어에 가까워 DSL을 구현하기에는 적합합니다.
이유는 함수형 언어는 객체지향보다 좀 더 코드가 직관적이기 때문입니다.

 

하지만 아래와 같은 단점도 존재합니다.

 

  • 새로운 프로그래밍을 배우거나 팀원 중 해당 언어에 대해 숙련자가 있어야 합니다.
  • 두개 이상의 언어가 혼재하므로 컴파일 빌드 시 과정을 개선해야 합니다.
  • 같은 JVM에서 동작은 하지만 사실상 100% 자바와 호환이 되는것은 아닙니다.

 

3. 외부 DSL

 

외부 DSL을 구현하기 위해서는 자신만의 문법과 구문으로 새 언어를 설계해야합니다.

 

또한, 새 언어를 파싱하고 파서의 결과를 분석하여 외부 DSL을 실행할 코드도 만들어야 합니다.

 

이러한 작업들은 매우 어려우며 잘못하면 성능이슈와 더불어 프로그램이 안전하지 않을 수 있습니다.

 

하지만 자신만의 언어를 설계함으로 인해 무한한 유연성을 취할 수 있다는 장점은 존재합니다.

또한, 자바코드와 외부 DSL코드의 명확한 분리가 된다는 장점도 존재합니다.

 

3. 최신 자바 API의 작은 DSL

자바 8에서 등장한 람다, 디폴트 메서드로 인해 작은 DSL을 제공하고 있다고 볼 수 있습니다.

 

아래는 자바 컬렉션에서 제공하는 작은 정렬 DSL입니다.

 

Collections.sort(persons, comparing(Person::getAge).reverse());

 

위 comparing, reverse와 같은 것들이 최신 자바에서 제공하는 DSL로 볼 수 있습니다.

 

자바 개발자가 아니더라도, comparing을 통해 비교를 하고, reverse를 보고 반대로 정렬하도록 되어있다는 것을 볼 수 있기 때문입니다.
또한, 내부적으로 어떠한 일을 하는지 숨겨놨기 때문에 자바로 만든 DSL로 볼 수 있습니다.

 

 

1) 스트림 API는 컬렉션을 조작하는 DSL

 

Stream 인터페이스는 네이티브 자바 API에 작은 내부 DSL을 적용한 좋은 예입니다.

 

List<String> errors = Files.lines(Paths.g(fileName))
        .filter(line -> line.startsWith("ERROR"))
        .limit(40)
        .collect(Collectors.toList());

 

위의 filter, limit, collect 모두 작은 DSL로 볼 수 있습니다.

 

 

2) 데이터를 수집하는 DSL인 Collectors

 

위에서 본 filter와 limit의 경우에는 Stream 인터페이스에서 제공하는 데이터 조작 DSL이라고 볼 수 있습니다.

 

하지만 마지막 collect의 경우에는 데이터를 수집하는 DSL로 간주 할 수 있습니다.

 

Collectors 클래스 또한 디폴트 메서드를 통해 쉽세 DSL을 사용할 수 있습니다.

 

다만, 가독성 측면에서 Collectors는 좋지 않을때가 있는데 아래는 그 예로 Comparator와 비교한 코드입니다.

 

// Collectors
Map<String, Map<Color, List<Car>>> carsByBrandAndColor =
                cars.stream().collect(Collectors.groupingBy(Car::getBrand), Collectors.groupingBy(Car::getColor));

// Comparator
Comparator comparator = Comparator.comparing(Person::getAge).thenComparing(Person::getName);

 

위는 Collectors의 중첩함수와 Comparator의 비교 함수입니다.

 

Collectors의 중첩의 경우 메서드 안에 메서드가 있는 형태입니다.

이는 Comparator의 체이닝메서드 형태보다 가독성 측면에서 떨어지게 됩니다.

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

4. 자바로 DSL을 만드는 패턴과 기법

 

위 3번의 Collectors 와 같이 제공하는 DSL이 모두 좋지 않을 수 있습니다.

때문에 이번에는 직접 DSL을 만드는 패턴과 기법에 대해 소개하겠습니다.

 

먼저 들어가기에 앞서 DSL은 특정 도메인 모델을 위한 언어이므로 아래와 같은 도메인을 설정하고 시작하겠습니다.

 

@Getter
@Setter
public class Stock {
    private String symbol;
    private String merket;
}


@Getter
@Setter
public class Trade {
    
    public enum Type {BUY, SELL};
    private Type type;
    private Stock stock;
    private int quantity;
    private double price;
    
    public double getValue() {
        return quantity * price;
    }
}



public class Order {

    @Getter
    private String customer;
    private List<Trade> trades = new ArrayList<>();
    
    public void addTrade(Trade trade) {
        trades.add(trade);
    }
    
    public double getValue() {
        return trades.stream().mapToDouble(Trade::getValue).sum();
    }
    
}

 

아래는 위 도메인 모델을 이용하여 BigBank라는 고객이 요청한 두 거래를 포함하는 주문을 만들은 예제입니다.

 

Order order = new Order();
order.setCustomer("BigBank");

Trade trade1 = new Trade();
trade1.setType(Trade.Type.BUY);

Stock stock1 = new Stock();
stock1.setSymbol("IBM");
stock1.setMerket("NYSE");

trade1.setStock(stock1);
trade1.setPrice(125.00);
trade1.setQuantity(80);
order.addTrade(trade1);

Trade trade2 = new Trade();
trade2.setType(Trade.Type.SELL);

Stock stock2 = new Stock();
stock2.setSymbol("GOOGLE");
stock2.setMerket("NASDAQ");

trade2.setStock(stock2);
trade2.setPrice(375.00);
trade2.setQuantity(50);
order.addTrade(trade2);

 

위 코드는 한눈에 봐도 장황하며 비개발자가 이해하기는 더욱 불가능한 코드입니다.

 

직관적으로 도메인 모델을 반영할 수 있는 DSL이 필요한 시점입니다.

 

1) 메서드 체인

 

DSL에서 가장 흔한 방식 중 하나가 메서드 체인 방식입니다.

 

메서드 체인을 사용하면 위에서 도메인 객체를 만들기 위한 장황한 코드는 아래와 같이 바뀔수 있습니다.

 

Order order = forCustomer("BigBank")
        .buy(80)
        .stock("IBM")
        .on("NYSE")
        .at(125.00)
        .buy(50)
        .stock("GOOGLE")
        .on("NASDAQ")
        .at(375.00)
        .end();

 

위와 같은 DSL을 제공하기 위해서는 몇개의 빌더 클래스를 만들어야 합니다.

 

public class MethodChainingOrderBuilder {
    
    public final Order order = new Order();
    
    private MethodChainingOrderBuilder(String customer) {
        order.setCustomer(customer);
    }
    
    public static MethodChainingOrderBuilder forCustomer(String customer) {
        return new MethodChainingOrderBuilder(customer);
    }
    
    public TradeBuilder buy(int quantity) {
        return new TradeBuilder(this, Trade.Type.BUY, quantity);
    }
    
    public TradeBuilder sell(int quantity) {
        return new TradeBuilder(this, Trade.Type.SELL, quantity);
    }
    
    public MethodChainingOrderBuilder addTrade(Trade trade) {
        order.addTrade(trade);
        return this;
    }
    
    public Order end() {
        return order;
    }
}

 

public class TradeBuilder {
    private final MethodChainingOrderBuilder builder;
    public final Trade trade = new Trade();

    public TradeBuilder(MethodChainingOrderBuilder builder, Trade.Type type, int quantity) {
        this.builder = builder;
        trade.setType(type);
        trade.setQuantity(quantity);
    }

    public StockBuilder stock(String symbol) {
        return new StockBuilder(builder, trade, symbol);
    }
}

 

public class StockBuilder {
    private final MethodChainingOrderBuilder builder;
    private final Trade trade;
    private final Stock stock = new Stock();

    public StockBuilder(MethodChainingOrderBuilder builder, Trade trade, String symbol) {
        this.builder = builder;
        this.trade = trade;
        stock.setSymbol(symbol);
    }

    public TradeBuilderWithStock on(String market) {
        stock.setMerket(market);
        trade.setStock(stock);
        return new TradeBuilderWithStock(builder, trade);

    }
}

 

@AllArgsConstructor
public class TradeBuilderWithStock {
    private final MethodChainingOrderBuilder builder;
    private final Trade trade;

    public MethodChainingOrderBuilder at(double price) {
        trade.setPrice(price);
        return builder.addTrade(trade);
    }
}

 

이러한 빌드 클래스를 이용하여 좀 더 직관적인 도메인 객체를 생성하는 DSL을 만들 수 있게 되었습니다.

 

하지만, 복잡한 빌드 클래스를 만들어야 한다는 단점이 있습니다.

 

 

2) 중첩된 함수 이용

 

중첩된 함수 DSL 패턴은 다른 함수안에 함수를 이용해 도메인 모델을 만듭니다.

 

아래는 중첩 함수 DSL를 사용하여 도메인 객체를 만드는 예제 입니다.

 

Order order = order("BigBank",
        buy(80,
                stock("IBM", on("NYSE")), at(125.00)),
        sell(50,
                stock("GOOGLE", on("NASDAQ")), at(375.00))
);



public class NestedFunctionOrderBuilder {
    
    public static Order order(String customer, Trade... trades) {
        Order order = new Order();
        order.setCustomer(customer);
        Stream.of(trades).forEach(order::addTrade);
        return order;
    }
    
    public static Trade buy(int quantity, Stock stock, double price) {
        return buildTrade(quantity, stock, price, Trade.Type.BUY);
    }

    public static Trade sell(int quantity, Stock stock, double price) {
        return buildTrade(quantity, stock, price, Trade.Type.SELL);
    }
    
    private static Trade buildTrade(int quantity, Stock stock, double price, Trade.Type buy) {
        Trade trade = new Trade();
        trade.setQuantity(quantity);
        trade.setType(buy);
        trade.setStock(stock);
        trade.setPrice(price);
        return trade;
    }
    
    public static double at(double price) {
        return price;
    }
    
    public static Stock stock(String symbol, String market) {
        Stock stock = new Stock();
        stock.setSymbol(symbol);
        stock.setMerket(market);
        return stock;
    }
    
    public static String on(String market) {
        return market;
    }
}

 

 

메서드 체인에 비해 함수의 중첩 방식이 도메인 객체 계층 구조에 그대로 반영된다는 것이 장점입니다.

 

하지만 이러한 DSL은 사용시 괄호가 많아진다는 단점이 있습니다.

 

 

3) 람다 표현식을 이용한 함수 시퀀싱

 

위 중첩함수 DSL을 이번에는 람다를 사용하여 더욱 깔끔하게 만들 수 있습니다.

 

아래는 예제입니다.

 

Order order = order( o-> {
    o.forCustomer("BigBank");
    o.buy( t -> {
        t.quantity(80);
        t.price(125.00);
        t.stock( s -> {
            s.symbol("IBM");
            s.market("NYSE");
        });
    });
    o.sell(t -> {
        t.quantity(50);
        t.price(375.00);
        t.stock(s -> {
            s.symbol("GOOGLE");
            s.market("NASDAQ");
        });
    });
});

 

public class LambdaOrderBuilder {
    private Order order = new Order();
    
    public static Order order(Consumer<LambdaOrderBuilder> consumer) {
        LambdaOrderBuilder builder = new LambdaOrderBuilder();
        consumer.accept(builder);
        return builder.order;
    }
    
    public void forCustomer(String customer) {
        order.setCustomer(customer);
    }
    
    public void buy(Consumer<LambdaTradeBuilder> consumer) {
        trade(consumer, Trade.Type.BUY);
    }

    public void sell(Consumer<LambdaTradeBuilder> consumer) {
        trade(consumer, Trade.Type.SELL);
    }
    
    private void trade(Consumer<LambdaTradeBuilder> consumer, Trade.Type type) {
        LambdaTradeBuilder builder = new LambdaTradeBuilder();
        builder.trade.setType(type);
        consumer.accept(builder);
        order.addTrade(builder.trade);
    }
}
public class LambdaTradeBuilder {
    public Trade trade = new Trade();

    public void quantity(int quantity) {
        trade.setQuantity(quantity);
    }

    public void price(double price) {
        trade.setPrice(price);
    }

    public void stock(Consumer<LambdaStockBuilder> consumer) {
        LambdaStockBuilder builder = new LambdaStockBuilder();
        consumer.accept(builder);
        trade.setStock(builder.stock);
    }
}
public class LambdaStockBuilder {
    public Stock stock = new Stock();

    public void symbol(String symbol) {
        stock.setSymbol(symbol);
    }

    public void market(String market) {
        stock.setMerket(market);
    }
}

 

위 방법은 앞서 봤던 2가지 방법의 장점을 모두 가지고 있습니다.


위 3가지 방법은 모두 장단점을 가지고 있으며 선호에 따라 3가지의 방법을 조합하여 사용해도 됩니다.

 

4) DSL에 메서드 참조 사용하기

 

DSL을 만들때 메서드 참조를 사용하여 더욱 직관적으로 만들 수 있습니다.

 

아래는 위 주식 거래 도메인에서 계산을 하는 DSL을 만드는 예제입니다.

 

double value = new TaxCalculator()
    .withTaxRegional()
    .withTaxSurcharge()
    .calculate(order);

 

public class TaxCalculator {
    private boolean useRegional;
    private boolean useGeneral;
    private boolean useSurcharge;

    public TaxCalculator withTaxRegional() {
        useRegional = true;
        return this;
    }

    public TaxCalculator withTaxGeneral() {
        useGeneral = true;
        return this;
    }

    public TaxCalculator withTaxSurcharge() {
        useSurcharge = true;
        return this;
    }

    public double calculate(Order order) {
        return calculate(order, useRegional, useGeneral, useSurcharge);
    }

    public static double calculate(Order order, boolean useRegional, boolean useGeneral,
                                   boolean useSurcharge) {
        double value = order.getValue();
        if(useRegional) value = Tax.regional(value);
        if(useGeneral) value = Tax.general(value);
        if(useSurcharge) value = Tax.surcharge(value);
        return value;
    }
}

 

위의 TaxCalculator 클래스는 각 boolean 값을 사용하여 확장성 부분에서 미약합니다.

하지만, 메서드 참조를 통해 리팩터링이 가능합니다.

 

아래는 메서드 참조를 통해 리팩터링한 코드입니다.

 

double value = new TaxCalculator()
    .with(Tax::regional)
    .with(Tax::surcharge)
    .calculate(order);

 

public class TaxCalculator {
    public DoubleUnaryOperator taxFunction = d -> d;

    public TaxCalculator with(DoubleUnaryOperator f) {
        taxFunction = taxFunction.andThen(f);
        return this;
    }

    public double calculate(Order order) {
        return taxFunction.applyAsDouble(order.getValue());
    }
}

 

5. 마무리

이번 포스팅에서는 Chapter10 람다를 이용한 도메인 전용 언어에 대해 진행하였습니다.

다음에는 Chapter11 null 대신 Optional 클래스에 대해 포스팅하겠습니다.

반응형
반응형

1. 서론

이번 포스팅에서는 Chapter9의 리팩터링, 테스팅, 디버깅 에 대해 진행하도록 하겠습니다.

 

2. 가독성과 유연성을 개선하는 리팩터링

 

람다를 사용한 코드는 동작 파라미터화를 통해 다양한 요구사항에 대응할 수 있습니다.

 

1) 코드 가독성 개선

 

코드 가독성이란 "어떤 코드를 다른 사람도 쉽게 이해할 수 있음" 을 의미합니다.

 

람다를 사용하게되면 이 코드 가독성을 높일 수 있습니다.

 

2) 익명 클래스를 람다 표현식으로 리팩터링하기

 

아래는 익명클래스를 람도로 리팩터링한 예제입니다.

 

Runnable r1 = new Runnable() {
    @Override
    public void run() {
        System.out.println("Hello");
        
    }
};

Runnable r2 = () -> System.out.println("Hello");

 

한눈에 봐도 코드가 간결해지면서 가독성이 올라간것을 볼 수 있습니다.

 

하지만 모든 익명 클래스를 람다로 바꿀수 있는것은 아닙니다.

 

1. 익명 클래스에서 사용한 this, super는 람다와 다릅니다.

 

익명클래스의 this는 익명클래스 자신을 가리키는 반면,

람다는 this의 경우 람다를 감사는 클래스를 가리킵니다.

 

2. 익명 클래스는 감싸고 있는 클래스의 변수를 가릴수 있지만 람다는 가릴수 없습니다.

 

아래는 예제입니다.

 

int a = 10;

Runnable r1 = new Runnable() {
    @Override
    public void run() {
        int a =2; // 정상 동작
        System.out.println(a)
    }
};

Runnable r2 = () -> {
    int a= 2; // 컴파일 에러
    System.out.println(a);
};

 

3. 익명클래스는 람다 표현식으로 바꿀시 컨텍스트 오버로딩에 따른 모호함이 발생합니다.

 

이런 경우, 명시적 형변환을 통해 모호함을 제거할 수 있습니다.

 

아래는 예제입니다.

 

interface Task {
    void execute();
}

public static void doSomething(Task a) {a.execute();};
public static void doSomething(Runnable r) {r.run();};

public static void main(String[] args) {
    doSomething((Task)() -> System.out.println("Danger Danger!!"));
}

 

3) 람다 표현식을 메서드 참조로 리팩터링하기

 

람다에서 메서드 참조를 통해 더욱 가독성을 높일 수 있습니다.

 

아래는 예제입니다.

 

enum CaloricLevel {
    DIET, NORMAL, FAT
}

@Getter
@RequiredArgsConstructor
private static class Dish {
    private final String name;
    private final boolean vegetarian;
    private final int calories;
    private final Type type;
    enum Type {
        MEAT, FISH, OTHER
    }
    
    public CaloricLevel getCaloricLevel() {
        if (this.getCalories() <= 400) return CaloricLevel.DIET;
        else if (this.getCalories() <= 700) return CaloricLevel.NORMAL;
        else return CaloricLevel.FAT;
    }
}

public static void main(String[] args) {
    List<Dish> menu = Arrays.asList(
            new Dish("pork", false, 800, Dish.Type.MEAT),
            new Dish("beef", false, 700, Dish.Type.MEAT),
            new Dish("chicken", false, 400, Dish.Type.MEAT),
            new Dish("french fries", true, 530, Dish.Type.OTHER),
            new Dish("rice", true, 350, Dish.Type.OTHER),
            new Dish("season fruit", true, 120, Dish.Type.OTHER),
            new Dish("pizza", true, 550, Dish.Type.OTHER),
            new Dish("prawns", false, 300, Dish.Type.FISH),
            new Dish("salmon", false, 450, Dish.Type.FISH)
            ;
    Map<CaloricLevel, List<Dish>> dishesByCaloricLevel = 
                    menu.stream().collect(groupingBy(Dish::getCaloricLevel));
}

 

4) 명령형 데이터 처리를 스트림으로 리팩터링하기

 

반복자를 통한 명령형 처리를 스트림으로 리팩터링할 시 더욱 가독성이 올라갑니다.

 

아래는 예제입니다.

 

List<String> dishNames = new ArrayList<>();
for (Dish dish : menu) {
    if(dish.getCalories() > 300) {
        dishNames.add(dish.getName());
    }
}

menu.parallelStream()
        .filter(d -> d.getCalories() > 300)
        .map(Dish::getName)
        .collect(Collectors.toList());

 

 

5) 코드 유연성 개선

 

람다 표현식으로 동작 파라미터화를 쉽게 가능한것을 앞절에서 살펴봤습니다.

 

이것은 결국 코드가 유연하게 돌아간다는 것을 의미하게 됩니다.

 

대표적으로 앞에서 살펴본 실행 어라운드가 있습니다.

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

3. 람다로 객체지향 디자인 패턴 리팩터링하기

기존 자바의 유지보수와 더욱 깔끔한 코드를 생성하기 위해 디자인 패턴을 사용했었습니다.

 

람다를 이용하면 이 디자인 패턴으로 해결하던 문제를 더욱 쉽고 간단하게 해결할 수 있습니다.

 

1) 전략

 

전략(strategy) 패턴은 런타임에 적절한 알고리즘을 선택하는 기법입니다.

 

아래는 기존 디자인 패턴으로 전략패턴을 구현한 예제입니다.

 

public interface ValidationStrategy {
    boolean execute(String s);
}

public class IsAllLowerCase implements ValidationStrategy {
    @Override
    public boolean execute(String s) {
        return s.matches("[a-z]+");
    }
}

public class IsNumeric implements ValidationStrategy {
    @Override
    public boolean execute(String s) {
        return s.matches("\\d+");
    }
}

public class Validator {
    private final ValidationStrategy validationStrategy;
    public Validator(ValidationStrategy v) {
        this.validationStrategy = v;
    }
    public boolean validate(String s) {
        return validationStrategy.execute(s);
    }
}

public static void main(String[] args) {
    Validator numericValidator = new Validator(new IsNumeric());
    boolean b1 = numericValidator.validate("aaaa");
    Validator lowerValidator = new Validator(new IsAllLowerCase());
    boolean b2 = lowerValidator.validate("bbbb");
}

 

위에는 구현체를 모두 정의하여 사용해야합니다.

 

하지만 람다를 사용하게 되면 아래와 같이 리팩터링이 가능합니다.

 

public interface ValidationStrategy {
    boolean execute(String s);
}

public class Validator {
    private final ValidationStrategy validationStrategy;
    public Validator(ValidationStrategy v) {
        this.validationStrategy = v;
    }

    public boolean validate(String s) {
        return validationStrategy.execute(s);
    }
}

public static void main(String[] args) {
    Validator numericValidator = new Validator( s -> s.matches("[a-z]+"));
    boolean b1 = numericValidator.validate("aaaa");

    Validator lowerValidator = new Validator(s -> s.matches("\\d+"));
    boolean b2 = lowerValidator.validate("bbbb");
}

 

2) 템플릿 메서드

 

템플릿 메서드 패턴은 알고리즘의 일부만을 고쳐야하는 상황에 필요합니다.

 

아래는 디자인 패턴으로 템플릿 메서드 패턴을 구현한 예제입니다.

 

abstract class OnlineBanking {
    public void processCustomer(int id) {
        Customer c = Database.getCustomerWithId(id);
        makeCustomerHappy(c);
    }
    
    abstract void makeCustomerHappy(Customer c);
}

 

위 같은 경우에는 OnlineBanking의 추상클래스를 상속받은 하위 클래스가 필요합니다.

 

하지만 람다를 사용한다면 굳이 추상, 하위 클래스 구조를 만들지 않아도 됩니다.

 

예제는 아래와 같습니다.

 

class OnlineBanking {
    public void processCustomer(int id, Consumer<Customer> makeCustomerHappy) {
        Customer c = Database.getCustomerWithId(id);
        makeCustomerHappy.accept(c);
    }
}

 

3) 의무 체인

 

작업 처리 객체의 체인을 만들 때는 의무 체인 패턴을 사용합니다.

 

아래는 디자인 패턴을 통해 의무체인 패턴을 구현한 코드입니다.

 

@Setter
public abstract class ProcessingObject<T> {
    protected ProcessingObject<T> successor;

    public T handle(T input) {
        T r = handleWork(input);
        if (successor != null) {
            return successor.handle(input);
        }
        return r;
    }
    abstract protected T handleWork(T input);
}
    
public class HeaderTextProcessing extends ProcessingObject<String> {

    @Override
    protected String handleWork(String input) {
        return "From Raoul, Mario and Alan: " + input;
    }
}

public class SpellCheckProcessing extends ProcessingObject<String> {

    @Override
    protected String handleWork(String input) {
        return input.replaceAll("labda", "lambda");
    }
}

public static void main(String[] args) {
    ProcessingObject<String> p1 = new HeaderTextProcessing();
    ProcessingObject<String> p2 = new SpellCheckProcessing();
    
    p1.setSuccessor(p2);
    
    String result = p1.handle("Aren't labdas really sexy?!!");
    System.out.println(result);
}

 

위와 같은 작업 처리는 Function의 andThen 메서드를 통해 간편하게 구현이 가능해졌습니다.

 

코드는 아래와 같습니다.

 

UnaryOperator<String> headerTextProcessing = s -> "From Raoul, Mario and Alan: " + s;
UnaryOperator<String> spellCheckProcessing = s -> s.replaceAll("labda", "lambda")

Function<String, String> pipeline = headerTextProcessing.andThen(spellCheckProcessing);
String result = pipeline.apply("Aren't labdas really sexy?!!");

 

 

4. 람다 테스팅

이번엔 람다를 테스팅하는 방법에 대해 살펴보겠습니다.

 

1) 보이는 람다 표현식의 동작 테스팅

 

람다는 결국 익명함수로 테스트 코드 작성 시 호출 할 수 없습니다.

 

이런 경우, 람다를 필드에 저장하여 재사용할 수 있도록 하여 람다의 로직 테스트를 수행할 수 있습니다.

 

2) 람다를 사용하는 메서드의 동작에 집중하라

 

람다의 목표는 정해진 동작을 다른 메서드에서 사용할 수 있도록 캡슐화하는 것입니다.

 

그렇기 때문에, 테스트 시 람다의 세부구현이 아닌 인풋과 람다를 통한 아웃풋으로 테스트를 진행하여 검증하면 됩니다.

 

3) 복잡한 람다를 개별 메서드로 분할하기

 

만약 람다 표현식이 복잡하여 테스트하기가 힘든경우에는,

위에서 살펴본곳처럼 메서드 참조로 수정하여 일반 메서드를 테스트하듯이 람다표현식을 테스트할 수 있습니다.

 

5. 디버깅

디버깅 시 개발자는 아래 두가지를 먼저 확인해야 합니다.

 

  • 스택 트레이스
  • 로깅

하지만, 람다 표현식와 스트림은 디버깅하기가 좀 까다롭습니다.

 

1) 스택 트레이스 확인

 

프로그램이 메서드를 호출할 때마다 호출위치, 호출할 때의 인수값, 호출된메서드의 지역변수 등 정보들이 생성되며 스택프레임에 저장됩니다.

 

스택트레이스는 이 스택프레임에서 정보를 가져와 보여주게됩니다.

 

하지만 람다의 경우 스택 트레이스가 기존과는 다르게 나오게 되며, 메서드 이름이 나오지 않습니다.

그 이유는, 람다 표현식은 이름이 없어 컴파일러가 이름을 자동을 생성하게 되면서 생겨나게 되기 때문입니다.

 

하지만 클래스와 같은 곳에 선언되어 있는 메서드 참조시에는 메서드 이름이 스택트레이스에 포함하게 됩니다.

 

안타깝게도 이것은 아직 fix되지 않은 점으로, 개발 시 스택트레이스의 확인에 불편함이 있을 수 있습니다.

 

2) 정보 로깅

 

스트림에서는 마지막 연산이 호출되는 순간 전체 스트림이 소비되어 로깅하기에 매우 까다롭습니다.

 

하지만, 이를 위해 peek 메서드를 제공하고 있습니다.

 

peek 메서드의 경우 실제로 스트림 요소를 소비하지 않고, 자신이 확인한 요소를 파이프라인의 다음 연산으로 그대로 전달하게 됩니다.

 

아래는 예제입니다.

 

List<Integer> numbers = List.of(1,2,3,4,5);
numbers.stream()
        .peek(System.out::println)
        .map(x -> x + 17)
        .collect(Collectors.toList());

 

6. 마무리

 

이번 포스팅에서는 Chapter9 리팩터링, 테스팅, 디버깅에 대해 진행하였습니다.

다음에는 Chapter10 람다를 이용한 도메인 전용 언어에 대해 포스팅하겠습니다.

반응형
반응형

1. 서론

 

이번 포스팅에서는 Chapter8의 컬렉션 API 개선 에 대해 진행하도록 하겠습니다.

 

 

2. 컬렉션 팩토리

자바 9에서는 작은 컬렉션 객체를 쉽게 만들 수 있는 팩토리 메서드를 제공하고 있습니다.

 

기존 작은 List 객체를 간단히 생성할 때는 Arrays.asList() 를 통해 만들었습니다.

Arrays 를 통해 만든 리스트에는 새 요소를 추가하거나 삭제는 불가능하지만 갱신이 가능했습니다.

 

자바 9에서는 컬렉션에 Immutable 한 객체를 생성할 수 있도록 팩토리 메서드를 제공하고 있습니다.

 

1) 리스트 팩토리

 

List.of 팩토리 메서드를 통해 간단하게 리스트를 만들 수 있습니다.

 

아래는 예제입니다.

 

List<String> friends = List.of("Raphael", "Olivia", "Thibaut");
System.out.println(friends);

 

List.of 로 만든 리스트의 경우에는 추가, 삭제, 변경 모두 되지 않는 리스트입니다.

또한, null은 허용하지 않아 의도치 않은 버그를 방지할 수 있습니다.

 

of 메서드의 경우 가비지 컬렉션 비용으로 인해 10개까지는 고정인자로 받을 수 있도록 오버로딩이 되어 있습니다.
10개 이상으로는 ... 문법을 통해 가변인자로 받고 있습니다.
이것은 Set, Map에서도 동일하게 적용되어 있습니다.

 

2) 집합 팩토리

 

List.of와 비슷하게 Set.of 를 통해 간단히 Set을 만들 수 있습니다.

 

 

3) 맵 팩토리

 

Map의 경우 자바 9에서 만들 수 있는 방법을 2가지 제공하고 있습니다.

 

1. Map.of

 

첫째로 Map.of 를 통해 만들 수 있습니다.

 

아래는 예제입니다

 

Map<String, Integer> ageOfFriends
	= Map.of("Raphael", 30, "Olivial", 25, "Thibaut", 26);
System.out.println(ageOfFriends);

 

위 방법은 10개 이하의 키와 값을 통해 만들 때 유용합니다.

 

10개가 넘어가는 Map을 만들때는 Map.ofEntries 를 사용해야합니다.

 

Map<String, Integer> ageOfFriends = Map.ofEntries(
        Map.entry("Raphael", 30),
        Map.entry("Olivial", 25),
        Map.entry("Thibaut", 26)
);

 

3. 리스트와 집합 처리

자바8 에서는 List, Set 인터페이스에 아래와 같은 메서드가 추가되었습니다.

 

  • removeIf: 프레디케이트를 만족하는 요소를 제거
  • replaceAll : 리스트에서 이용할 수 있는 기능으로 UnaryOperator 함수를 이용해 요소를 바꿈
  • sort : List 인터페이스에서 제공하는 기능으로 리스트를 정렬한다.

위 메서드는 새로운 컬렉션을 반환하는것이 아니라 메서드를 호출한 컬렉션 자체를 바꿉니다.

 

컬렉션을 바꾸는 동작은 에러를 유발하는 위험한 코드입니다.

이를 위해 위와 같은 메서드가 만들어지게 되었습니다.

 

1) removeIf

 

아래는 리스트에서 숫자로 시작되는 값을 삭제하는 코드입니다.

 

for(Transaction transaction : transactions) {
    if(Character.isDigit(transaction.getReferenceCode().charAt(0))) {
        transactions.remove(transaction);
    }
}

 

아쉽게도 위 코드는 ConcurrentModificationException을 발생시킵니다.

 

내부적으로 for-each는 Iterator 객체를 사용하게 되는데 위와같이 remove를 하게 되면,

Iterator와 컬렉션의 상태가 동기화가 되지 않기 때문입니다.

 

하지만 자바8에서 제공하는 removeIf를 통해 이 문제를 해결할 수 있습니다. 또한, 더욱 readable해진 코드가 탄생합니다.

 

아래는 removeIf를 적용한 예제 코드입니다.

 

transactions.removeIf(transaction -> Character.isDigit(transaction.getReferenceCode().charAt(0)));

 

2) replaceAll 메서드

 

자바8에서는 변경관련하여 replcateAll이라는 메서드를 제공하고 있습니다.

 

아래와 같이 stream API를 통해서도 가능합니다.

List<String> referenceCodes = List.of("a12", "C14", "b13");
referenceCodes
        .stream()
        .map(code -> Character.toUpperCase(code.charAt(0)) + code.substring(1))
        .collect(Collectors.toList());

 

하지만 위의 코드의 경우에는 새로운 컬렉션이 생성될 뿐더러 코드가 replcaceAll보다는 길게 만들어집니다.

 

아래는 replcaceAll를 적용한 코드입니다.

 

referenceCodes.replaceAll(code -> Character.toUpperCase(code.charAt(0)) + code.substring(1));

 

 

 

반응형

 

 

 

4. 맵 처리

자바8에서는 Map 인터페이스에도 몇가지 디폴트 메서드가 추가되었습니다.

 

1) forEach 메서드

 

아래는 기존 Map을 순회하는 코드입니다.

 

for(Map.Entry<String, Integer> entry : ageOfFriends.entrySet()) {
    String friend = entry.getKey();
    Integer age = entry.getValue();
    System.out.println(friend + " is " + age + " years old");
}

 

자바 8에서는 forEach 메서드를 제공하여 아래와 같이 간편하게 코드를 사용할 수 있도록 되었습니다.

 

ageOfFriends.forEach((friend, age) -> System.out.println(friend + " is " + age + " years old"));

 

2) 정렬 메서드

 

정렬 관련해서드 아래와 같이 2개 메서드를 제공하고 있습니다

 

  • Entry.comparingByValue
  • Entry.comparingByKey

아래는 Entry.comparingByKey의 예제입니다.

 

Map<String, String> favoriteMovies = Map.ofEntries(
        Map.entry("Raphael", "Star Wars"),
        Map.entry("Cristina", "Matrix"),
        Map.entry("Olivia", "James Bond")
);

favoriteMovies
        .entrySet()
        .stream()
        .sorted(Map.Entry.comparingByKey())
        .forEachOrdered(System.out::println);

 

3) getOrDefault 메서드

 

Map에 키가 없는경우, null이 아닌 지정한 디폴트 값을 가져오도록 getOrDefault 메서드를 제공하고 있습니다.

 

이는, NullPointerException을 방지하기 위한 null 체크로직이 없어져 개발자에게 큰 이득을 주게되었습니다.

 

아래는 예제입니다.

 

Map<String, String> favoriteMovies = Map.ofEntries(
        Map.entry("Raphael", "Star Wars"),
        Map.entry("Cristina", "Matrix"),
        Map.entry("Olivia", "James Bond")
);

System.out.println(favoriteMovies.getOrDefault("Olivia", "Matrix"));
System.out.println(favoriteMovies.getOrDefault("Thibaut", "Matrix"));

 

getOrDefault 도 키가 존재하지만 값이 null인 경우에는 null을 반환하는 점을 유의해야 합니다.

 

4) 계산 패턴

 

맵에 키 존재여부에 따라 동작을 수행 후 결과를 저장해야 하는 경우가 있습니다.

 

이를 위해 아래 3개 메서드를 제공하고 있습니다.

 

  • computeIfAbsent : 제공된 키에 해당하는 값이 없으면, 키를 이용해 새 값을 계산하고 맵에 추가
  • computeIfPresent : 제공된 키가 존재하면 새 값을 계산하고 맵에 추가
  • compute : 제공된 키로 새 값을 계산하여 맵에 저장

아래는 computeIfAbsent 예제입니다.

 

Map<String, List<String>> favoriteMovies = new HashMap<>();
favoriteMovies.computeIfAbsent("Raphael", name -> new ArrayList()).add("Star Wars");

 

위와 같이 Map<String, List<String>> 같은 자료구조를 사용할때 유용하게 사용할 수 있는것을 볼 수 있습니다.

 

computeIfPresent는 computeIfAbsent와 반대라고 생각하시면 됩니다.
단, 한가지 특이점은 computeIfPresent의 경우 반환값이 null인 경우 매핑을 해제하게 됩니다.

 

5) 삭제 패턴

 

자바8 에서는 기존에 있는 remove메서드를 오버로드하여, 키가 특정한 값과 연관되었을때만 지우도록 제공하고 있습니다.

 

아래는 예제입니다.

 

favoriteMovies.remove(key, value);

 

6) 교체 패턴

 

맵의 항목을 바꾸는데 사용할 수 있는 메서드도 제공하고 있습니다.

 

  • replaceAll : BiFunction을 적용한 결과로 각 항복의 값을 교체
  • Replace : 키가 존재하면 맵의 값을 바꿈

아래는 replaceAll의 예제 코드입니다.

 

Map<String, String> favoriteMovies = new HashMap<>();
favoriteMovies.put("Raphael", "Star Wars");
favoriteMovies.put("Olivia", "James Bond");
favoriteMovies.replaceAll((friend, movie) -> movie.toUpperCase());

 

7) 합침

 

두개의 Map을 합치는 경우에는 putAll 메서드를 제공하고 있습니다. 

하지만, 이 메서드의 경우에는 두 Map에 중복된 키가 없는 경우에만 가능합니다.

 

이를 위해 자바 8에서는 merge 메서드를 제공하고 있습니다.

 

아래는 merge 메서드를 사용한 예제입니다.

 

Map<String, String> family = Map.ofEntries(
        Map.entry("Teo", "Star Wars"),
        Map.entry("Cristina", "James Bond")
);

Map<String, String> friends = Map.ofEntries(
        Map.entry("Raphael", "Star Wars"),
        Map.entry("Cristina", "Matrix")
);

Map<String, String> everyOne = new HashMap<>(family);
friends.forEach(
        (k, v) -> everyOne.merge(k, v, (movie1, movie2) -> movie1 + " & " + movie2));

 

3번째 인자로 중복키가 있는 경우 어떻게 처리할건지에 대해 받고 있는것을 볼 수 있습니다.

 

5. 개선된 ConcurrentHashMap

ConcurrentHashMap은 내부 자료구조의 특정 부분만 잠궈 동시추가, 갱신 작업을 허용하는 동시성에 친화적인 Map입니다.

 

1) 리듀스와 검색

 

ConcurrentHashMap에서는 스트림과 비슷하게 아래 3개의 연산을 지원하고 있습니다.

 

  • forEach : 각 (키, 값) 쌍에 주어진 액션 실행
  • reduce : 모든 (키, 값) 쌍을 제공된 리듀스 함수를 이용해 결과로 합침
  • search : 널이 아닌 값을 반환할 때까지 각 (키, 값) 쌍에 함수를 적용

아래는 위 연산을 수행할 수 있도록 제공하는 메서드입니다.

 

  • 키 값으로 연산 -> forEach, reduce, search
  • 키로 연산 -> forEachKey, reduceKeys, searchKeys
  • 값으로 연산 -> forEachValue, reduceValues, searchValues
  • Map.Entry 객체로 연산 -> forEachEntry, reduceEntries, searchEntries

위 연산들은 모두 ConcurrentHashMap의 상태를 잠그지 않고 연산을 수행하게 됩니다.

따라서, 연산이 수행된는 동안 변경에 대해 의존이 있으면 안됩니다.

 

2) 계수

 

기존 Map의 사이즈는 size 메서드를 사용했습니다.

하지만 size는 int형으로서 int 범위를 넘어서는 상황을 대처하기 위해 mappingCount 함수를 제공하고 있습니다.

 

mappingCount는 long형 입니다.

 

6. 마무리

 

이번 포스팅에서는 Chapter8 컬렉션 API 개선에 대해 진행하였습니다.

다음에는 Chapter9 리팩터링, 테스팅, 디버깅에 대해 포스팅하겠습니다.

반응형
반응형

1. 서론

이번 포스팅에서는 Chapter7의 병렬 데이터 처리와 성능 에 대해 진행하도록 하겠습니다.

 

 

2. 병렬 스트림

java 8 에서는 병렬 처리를 간편하게 제공하고 있습니다.

예로 컬렉션에서 parallelStream를 통해 간편히 병렬 스트림 처리가 가능합니다.

 

1) 순차 스트림을 병렬 스트림으로 변환하기

 

순차 스트림에 parallel 메서드를 통해 병렬스트림으로 변경이 가능합니다.

 

아래는 그 예제입니다.

 

public long parallelSum(long n) {
    return Stream.iterate(1L, i -> i +1)
            .limit(n)
            .parallel()
            .reduce(0L, Long::sum);
}

 

parallel 처리 동작방식은 각 쓰레드에게 분할한 청크를 전달하여 병렬로 수행하도록 하는 것입니다.

 

아래는 그림으로 수행과정을 나타낸 것입니다.

 

 

추가로, 병렬에서 순차 스트림으로 다시 바꿀때에는 sequential 메서드를 사용하면 됩니다.

 

만약, parallel과 sequential 두개를 모두 사용했을 때에는 최종적으로 호출된 메서드로 전체 스트림 파이프라인에 영향이 미치게 됩니다.

 

병렬스트림으로 사용하는 쓰레드는 ForkJoinPool 을 사용하며 갯수는 Runtime.getRuntime().availableProcessors() 의 반환값으로 결정됩니다.
전역적으로 쓰레드수를 변경하고 싶을때는 아래와 같이 시스템 설정을 해주시면 됩니다.
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "12")

 

 

 

2) 병렬 스트림 효과적으로 사용하기

 

병렬 프로그래밍은 항상 유심히 문제점이 없는지 파악한 후 개발해야 합니다.

 

java에서 제공해주는 병렬 스트림도 마찬가지입니다.

parallel을 사용하는 부분은 올바른 처리가 이루어지는지와 성능은 올라갔는지 테스트를 해야합니다.

java의 스트림은 내부로직이 복잡하기 때문에, 무분별하게 사용하게 되면 성능이 더욱 악화가 될 수 있습니다.

 

Collectors를 통해 리듀싱 작업을 수행하는 경우에는 toConcurrentMap과 같이 ConcurrentMap 클래스가 리턴타입일때만 병렬로 수행이 됩니다.
결국, 다른 toList와 같은 리턴타입은 병렬로 수행되지 않으므로 parallelStream으로 처리할 경우 성능이 악화 될 수 있습니다.

 

병렬로 처리해야하는지에 대한 결정은 아래와 같은 부분이 도움이 될 수 있습니다.

 

  1. 확신이 서지 않는다면 직접 성능 측정.
  2. 박싱 타입 유의
  3. 순차스트림보다 병렬 스트림에서 성능이 떨어지는 연산 고려 - ex) limit과 같이 순서에 의존하는 연산.
  4. 스트림 전체 파이프라인 연산 비용 고려
  5. 소량 데이터의 경우 병렬 스트림 제외
  6. 스트림으로 구성하는 자료구조가 적절한지 고려
  7. 최종 연산의 병합 과정 비용 고려

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

3. 포크/조인 프레임워크

포크/조인 프레임워크는 자바 7에서 추가되었으며, 자바 8의 병렬스트림에서는 내부적으로 포크/조인 프레임워크로 처리하고 있습니다.

 

포크/조인 프레임워크는 재귀를 통해 병렬화 작업을 작게 분할한 다음 ForkJoinPool의 작업자 스레드에 분산 할당하는 방식입니다.

 

1) RecursiveTask 활용

 

스레드 풀을 이용하려면 RecursiveTask<R>의 서브클래스를 만들어야 합니다.

여기서 R은 제네릭으로 결과타입 또는 결과가 없을때의 타입을 의미합니다.

 

RecursiveTask를 구현하기 위해서는 추상메서드인 compute메서드를 구현해야합니다.

compute 메서드는 태스크를 서브태스크로 분할하는 로직과 더 이상 분할할 수 없을 때 개별 서브태스크의 결과를 생산할 알고리즘을 정의해야 합니다.

 

아래는 compute 메서드의 의사코드입니다.

 

if (태스크가 충분히 작거나 더 이상 분할 할 수 없으면) {
	순차적으로 태스크 계산
} else {
    태스크를 두 서브태스크로 분할
    태스크가 다시 서브태스크로 분할되도록 이 메서드를 재귀적으로 호출함
    모든 서브태스크의 연산이 완료될 때까지 기다림
    각 서브태스크의 결과를 합침
}

 

아래는 long[] 의 sum을 구하는 것을 포크/조인 프레임워크를 사용하는 예제입니다.

 

public static long forkJoinSum(long n) {
    long[] numbers = LongStream.rangeClosed(1, n).toArray();
    ForkJoinTask<Long> task = new ForkJoinSumCalculator(numbers);
    return new ForkJoinPool().invoke(task);
}

public class ForkJoinSumCalculator extends RecursiveTask<Long> {
    private final long[] numbers;
    private final int start;
    private final int end;
    public static final long THRESHOLD = 10_000;
    public ForkJoinSumCalculator(long[] numbers) {
        this(numbers, 0, numbers.length);
    }
    @Override
    protected Long compute() {
        int length = end - start;
        if(length <= THRESHOLD) {
            return computeSequentially();
        }
        ForkJoinSumCalculator leftTask = new ForkJoinSumCalculator(numbers, start, start + length/2);
        leftTask.fork();
        
        ForkJoinSumCalculator rightTask = new ForkJoinSumCalculator(numbers, start + length/2, end);
        
        long rightResult = rightTask.compute();
        long leftResult = leftTask.join();
        
        return leftResult + rightResult;
    }
    private long computeSequentially() {
        long sum = 0;
        for(int i = start; i< end; i++) {
            sum += numbers[i];
        }
        return sum;
    }
}

 

위 ForkJoinSumCalculator의 경우에는 배열의 길이가 10,000 보다 큰것은 반으로 자르면서 분할시키고 있습니다.

그리고 분할을 재귀로 계속 수행 후 결과를 모아서 반환하고 있습니다.

 

재귀를 통해 분할된 작업들은 ForkJoinPool에 넘겨져 병렬로 수행되어 집니다.

 

위 예제와 같이, 병렬로 수행 시 결과에 영향이 가지 않는 연산에서만 사용해야 합니다.

 

2) 포크/조인 프레임워크를 제대로 사용하는 방법

 

아래는 포크/조인 프레임워크를 효과적으로 사용하기 위해 알아야 할 점입니다.

 

  1. 두 서브 태스크가 모두 시작된 다음에 join을 호출해야 합니다.
  2. RecursiveTask 내에서는 ForkJoinPool의 invoke메서드 대신 fork나 compute 메서드를 직접 호출해야 합니다.
  3. 서브 태스크에서 fork나 compute를 통해 ForkJoinPool의 일정을 조절할 수 있습니다.
  4. 포크/조인 프레임워크를 이용하는 병렬계산은 디버깅하기 어렵습니다.
  5. 멀티코어에 포크/조인 프레임워크를 사용하는 것이 순차 처리보다 무조건 빠르지는 않습니다.

 

3) 작업 훔치기

 

멀티 쓰레드로 처리를 하다보면 고루 처리량을 할당했더라도, 각 쓰레드마다 완료 시점이 다릅니다.

이 경우, 노는 쓰레드가 발생하게되며 성능이 생각한것만큼 좋아지지 않게됩니다.

 

이를 위해, 포크/조인 프레임워크는 작업훔치기라는 기법을 사용하고 있습니다.

간단히 말해서, task들을 큐가 아닌 덱에 넣고 노는 쓰레드는 일하는 쓰레드의 덱의 꼬리에서 task가 있다면 훔쳐와 동작하는 것입니다.

 

그렇기 때문에, task는 적절히 작은 양으로 분배가 되도록 해야합니다.

 

4. Spliterator 인터페이스

자바 8은 Spliterator라는 새로운 인터페이스를 제공합니다.

 

Spliterator는 스트림을 분할하여 처리할때 사용하며,

자바 8은 컬렉션 프레임워크에 포함된 모든 자료구조에 사용할 수 있는 디폴트 Spliterator 구현을 제공하고 있습니다.

 

아래는 Spliterator 인터페이스에서 필수로 구현해야하는 메서드만을 모아놓은 명세입니다.

 

public interface Spliterator<T> {
    boolean tryAdvance(Consumer<? super T> action);
    Spliterator<T> trySplit();
    long estimateSize();
    int characteristics();
}

 

tryAdvance 는 Spliterator의 요소를 하나씩 순차적으로 소비하면서 탐색해야 할 요소가 남아 있으면 참을 반환합니다.

trySplit Spliterator의 일부 요소를 분할해서 두 번째 Spliterator를 생성하는 메서드입니다.

estimateSize 는 메서드로 탐색해야 할 요소 수 정보를 제공할 수 있습니다.

characteristicsSpliterator에서 정의한 int형으로 각 값은 의미하고 있는것이 있습니다. - ex) 16 = Spliterator.ORDERED

 

 

1) 분할 과정

 

Spliterator는 trySplit 메서드를 통해 스트림 요소를 재귀적으로 분할합니다.

 

아래는 Spliterator를 통해 분할하는 과정의 그림입니다.

 

 

이 과정은 characteristics 메서드로 정의하는 Spliterator의 특성에 영향을 받습니다.

 

2) Spliterator 특성

 

characteristics 메서드는 Spliterator 자체의 특성 집합을 포함하는 int를 반환합니다.

 

아래는 Spliterator 특성에 대한 표입니다.

 

특성 의미
ORDERED 리스트처럼 요소에 정해진 순서가 있으므로 순서에 유의해야 함.
DISTINCT x, y 두 요소를 방문했을 시 x.equals(y) 는 항상 false를 반환해야 함.
SORTED 탐색된 요소는 미리 정의된 정렬 순서를 따라야 함.
SIZED 크기가 알려진 소스로 Spliterator를 생성했으므로 estimateSize는 정확한 값을 반환함.
NON-NULL 탐색하는 모든 요소는 null이 아님.
IMMUTABLE Spliterator는 불변. 즉 탐색 중간에 추가, 삭제가 불가능함.
CONCURRENT 동기화 없이 Spliterator의 소스를 여러 쓰레드에서 동시에 고칠 수 없음.
SUBSIZED Spliterator 그리고 분할되는 모든 Spliterator는 SIZED 특성을 갖고 있음.

 

5. 마무리

이번 포스팅에서는 Chapter 7인 병렬 데이터 처리와 성능에 대해 진행하였습니다.

다음에는 Chapter 8인 컬렉션 API 개선에 대해 포스팅하겠습니다.

반응형

'Programming > ModernJavaInAction' 카테고리의 다른 글

(9) 리팩터링, 테스팅, 디버깅  (0) 2020.04.13
(8) 컬렉션 API 개선  (0) 2020.04.13
(6) 스트림으로 데이터 수집  (0) 2020.04.04
(5) 스트림 활용  (0) 2020.04.04
(4) 스트림 소개  (0) 2020.03.28
반응형

1. 서론

이번 포스팅에서는 Hbase가 제공하는 클라이언트 API 중 기본적인 기능에 대해 알아보겠습니다.

 

2. 일반 정보

Hbase에 접근하는 주요 클라이언트 인터페이스는 org.apache.hadoop.hbase.client 패키지에 있는 HTable 입니다.

 

HTable은 Hbase에 데이터를 저장, 삭제의 일을 할 수 있도록 제공합니다.

 

단, HTable을 사용할 시 주의할 점이 있습니다.

 

HTable은 인스턴스화될 시 메타(.META.) 테이블을 스캔하여 테이블이 있는지, 있다면 활성화 되있는지를 확인하고

그 외에도 부수적인 작업들을 수행하기 때문에 인스턴스화 작업은 느립니다.

 

따라서, HTable 인스턴스화는 되도록 한번만 수행하며 스레드당 하나씩만 생성하도록 하는것이 좋습니다.

그리고, 한번 생성된 HTable는 어플리케이션이 끝날때까지 재사용하는 편이 성능상 이점을 볼 수 있습니다.

 

3. CRUD 기능

HTable의 CRUD 역할에 대해 소개하겠습니다.

 

1) Put 메소드

 

데이터를 적재할 때 사용하는 API입니다.

단일로우와 멀티로우를 적재할 수 있도록 모두 제공하고 있습니다.

 

 

1. 단일 Put

 

put 메서드의 명세는 아래와 같습니다.

 

void put(Put put) throws IOException

 

인자로 받는 Put은 하나 또는 리스트 형태로 받을 수 있습니다.

 

아래는 Put의 생성자 메서드 명세입니다.

 

Put(byte[] row)
Put(byte[] row, RowLock rowLock)
Put(byte[] row, long ts)
Put(byte[] row, long ts, RowLock rowLock)

 

Hbase의 로우는 고유한 키입니다.

 

이 키는 타입이 자바의 바이트 배열로 어떠한 값이 들어와도 상관이 없습니다.

단, 로우 키는 사전편찬식으로 정렬된다는 것을 명심하고 키설계를 하여 사용해야 합니다.

 

아래는 어떠한 타입이든 바이트 배열로 만들어주는 헬퍼 클래스인 Bytes의 메서드입니다.

 

static byte[] toBytes(ByteBuffer bb)
static byte[] toBytes(String s)
static byte[] toBytes(boolean b)
static byte[] toBytes(long val)
static byte[] toBytes(float f)
static byte[] toBytes(int val)

 

Put 인스턴스를 생성 한 다음에는 컬럼패밀리, 퀄리파이어, 값, 타임스탬프 등을 추가할 수 있습니다.

 

아래는 Put의 데이터 추가 메서드입니다.

 

Put add(byte[] family, byte[] qualifier, byte[] value)
Put add(byte[] family, byte[] qualifier, long ts, byte[] value)
Put add(KeyValue kv) throws IOException

 

add 메서드를 한번 수행할 때마다 하나의 컬럼이 추가됩니다.

 

KeyValue 클래스의 경우에는 하나의 고유한 셀을 나타내는 고급 클래스입니다.

 

이 클래스를 얻기 위해서는 get 메서드를 통해 얻을 수 있습니다.

 

아래는 KeyValue를 가져오는 get 메서드입니다.

 

List<KeyValue> get(byte[] family, byte[] qualifier)
Map<byte[], List<KeyValue>> getFamilyMap()

 

특정 셀이 존재하는지 알아볼때에는 아래와 같이 get이 아닌 has 메서드를 제공합니다.

 

boolean has(byte[] family, byte[] qualifier)
boolean has(byte[] family, byte[] qualifier, long ts)
boolean has(byte[] family, byte[] qualifier, byte[] value)
boolean has(byte[] family, byte[] qualifier, long ts, byte[] value)

 

아래는 put을 통해 데이터를 적재하는 간단한 예제입니다.

 

public class PutExample {

    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();

        HTable hTable = new HTable(conf, "testtable");

        Put put = new Put(Bytes.toBytes("row1"));
        
        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));

        hTable.put(put);
    }
}

 

위의 Configuration은 org.apache.hadoop.conf 패키지에 존재하며 설정 정보를 주입하는 역할입니다.

 

기존 설정정보는 hbase-site.xml에 기입해야하지만, 동적으로 변경해야 할때는  Configuration를 사용하면 됩니다.

 

아래는 주키퍼 쿼럼 정보를 동적으로  Configuration를 사용하여 세팅하는 예제입니다.

 

Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "zk1.foo.com,zk2.foo.com,");

 

put의 경우 타임스탬프를 부여하여 각 셀에 대하여 버저닝을 할 수도 있습니다.

부여하지 않을 시에는 로우가 저장소에 추가되는 순간에 해당 리전서버의 시각으로 자동 부여가 됩니다.

 

Hbase의 경우, 셀은 타임스탬프값을 기준으로 정렬되어져 저장됩니다.

 

 

2. KeyValue 클래스

 

코드상에서 KeyValue 인스턴스를 처리해야 하는 경우가 종종 있습니다.

때문에, KeyValue 클래스에 대해 간단히 살펴보겠습니다.

 

우선, KeyValue 클래스는 특정 셀의 정보를 가지고 있습니다.

특정 셀의 정보는 로우 키, 컬럼패밀리, 컬럼 퀄리파이어, 타임스탬프를 의미합니다.

 

메서드로는 아래와 같이 있습니다.

 

// 아래 3개는 KeyValue 인스턴스에 저장되어 있는 전체 바이트 배열에 관한 메서드입니다.
byte[] getBuffer()
int getOffset()
int getLength()

// 아래 2개는 로우키와 데이터가 저장된 워시 좌표 정보의 바이트 배열을 반환하는 메서드입니다.
byte[] getRow()
byte[] getKey()

 

또한, KeyValue 클래스는 내부적으로 Comparator를 통해 값에 대해서 커스텀하게 정렬을 할 수 있도록 제공합니다.

 

아래는 KeyValue 클래스에서 제공하는 Comparator 종류입니다.

 

비교 연산자 설명
KeyComparator KeyValue 2개의 키를 비교합니다.
즉, getKey 메서드를 통해 원시 바이트 배열을 비교합니다.
KVComparator 원시 형태의 KeyComparator를 감싼 형으로서,
KeyValue 2개를 비교할 때 사용합니다.
RowComparator getRow 로 얻은 값으로 비교합니다.

 

KeyValue 클래스는 type이라는 필드를 가지고 있습니다.

이 type은 KeyValue에 하나의 차원이 더 추가되는것과 같습니다.

 

type 에 사용가능한 값은 아래와 같습니다.

 

유형 설명
Put 해당 KeyValue 인스턴스가 일반적인 Put 연산임을 의미합니다.
Delete 해당 KeyValue 인스턴스가 일반적인 Delete 연산임을 의미합니다.
DeleteColumn Delete와 같지만, 더 광범위하게 전체 컬럼을 삭제한다는 의미입니다.
DeleteFamily Delete와 같지만, 더 광범위하게 전체 컬럼패밀리 및 그에 속한 모든 컬럼을 삭제한다는 의미입니다.

 

마지막으로 KeyValue에 정보를 출력하고 싶으실때는 아래 메서드를 사용하시면 됩니다.

 

String toString()

 

아래는 toString 메서드의 결과 형식을 나타냅니다.

 

<row-key>/<family>:<qualifier>/<version>/<type>/<value-length>

 

 

3. 클라이언트 측 쓰기 버퍼

 

Hbase의 경우 쓰기 연산은 RPC를 통해 이루어 집니다.

 

이 RPC는 remote procedure call로서 갯수가 적을때는 괜찮지만 초당 수천개의 값을 테이블에 저장하는 데에는 적합하지 않습니다.

 

그로인해 Hbase 클라이언트는 내부적으로 쓰기 버퍼를 두어 한번의 RPC로 서버에 데이터를 전송하도록 제공하고 있습니다.

 

쓰기버퍼의 사용여부는 아래와 같이 설정할 수 있습니다.

현재 포스팅을 작성하는 때에는 아래와 같이 버퍼 관련은 deprecated 되었습니다. -> 최신 버전에서는 아예 메서드에서 제외되었습니다.
대신, BufferedMutator 클래스를 별도로 제공하여 아래와 같은 기능을 제공하고 있습니다.

지금은 책내용을 기반으로 진행하도록 하겠습니다.

 

void setAutoFlush(boolean autoFlush)
boolean isAutoFlush()

 

디폴트는 비활성 상태입니다.

활성으로 하기 위해서는 아래와 같이 setAutoFlush를 false로 설정합니다.

 

table.setAutoFlush(false)

 

위와 같이 false로 설정된 후부터는 put 메서드를 호출하더라도 클라이언트측 메모리 버퍼에 저장되고 서버로는 실제 전송이 이루어 지지 않습니다.

 

서버로 RPC를 날리기 위해서는 아래와 같은 메서드를 호출하면 됩니다.

 

void flushCommits() throws IOException

 

쓰기 버퍼의 경우 클라이언트에서 내부적으로 알아서 비워주기 때문에 크게 고려하지 않아도 됩니다.

 

추가로, 쓰기 버퍼의 크기도 조정이 가능한데 방법은 아래와 같습니다.

 

long getWriteBufferSize()
long setWriteBufferSize(long writeBufferSize) throws IOException

 

물론, 명시적으로 버퍼를 비울수도 있습니다.

 

개발자가 작성한 코드로 인해서 버퍼가 비워지는 경우는 아래와 같습니다.

 

  • flushCommits 호출 시
  • autoFlush가 true의 경우 put 메서드 호출 시
  • setWriteBufferSize 호출 시

 

아래는 쓰기 버퍼를 사용한 예제 입니다.

 

    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();
        HTable hTable = new HTable(conf, "testtable");
        
        System.out.println("Auto flush: " + hTable.isAutoFlush());

        hTable.setAutoFlush(false);
        
        
        Put put1 = new Put(Bytes.toBytes("row1"));
        put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
        hTable.put(put1);

        Put put2 = new Put(Bytes.toBytes("row2"));
        put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val2"));
        hTable.put(put2);

        Put put3 = new Put(Bytes.toBytes("row3"));
        put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val3"));
        hTable.put(put3);

        Get get = new Get(Bytes.toBytes("row1"));
        Result res1 = hTable.get(get);
        System.out.println("Result: " + res1);
        
        hTable.flushCommits();
        
        Result res2 = hTable.get(get);

        System.out.println("Result: " + res2);
    }

 

위 결과 출력은 아래와 같습니다.

 

Auto flush: true
Result: keyvalues=None
Result: keyvalues={ro1/colfam1/qual1/123412358/Put/vlen=4}

 

위에서 설명한것과 같이 버퍼에 쓰기만 하고 서버로 PRC를 하지 않았기 때문에

첫번째 Get에서는 None이 나온것을 볼 수 있습니다.

 

만약 버퍼에 있는 데이터를 보기 위해서는 아래 메서드를 사용하면 됩니다.

 

ArrayList<Put> getWriteBuffer()

 

하지만 이 방법은 멀티쓰레드 환경에서 조심해야 합니다.

이유로는 List에 접근 시 힙 크기를 확인하지 않고 접근하며, 또 버퍼 비우기가 진행되는 도중 다른 쓰레드가 값을 변경할 수 있기 때문입니다.

 

 

4. Put 리스트

 

클라이언트 API는 단일 put이 아닌 List<put>도 처리 가능하도록 제공합니다.

 

void put(List<Put> puts) throws IOException

 

위 메서드를 사용하면 List<Put>으로 데이터 적재가 가능합니다.

 

다만, List에 있는 모든 Put이 성공하지 않을 수 있습니다.

성공하지않은 Put이 있다면 클라이언트는 IOException을 받게됩니다.

 

하지만, Hbase는 List에 있는 put을 이터레이트돌며 적용하기 때문에 하나가 실패한다고 안에 있는것이 모두 실패하지는 않습니다.

 

실패한 Put은 쓰기버퍼에 남아있게 되고 다음 flush 작업에서 재수행하게 됩니다.

만약 데이터가 잘못되어 실패되는 케이스라면 계속 버퍼에 남게되어 재수행을 반복하게 될 것입니다.

 

이를 방지하기 위해서는 수동을 버퍼를 비워줘야 합니다.

 

아래는 일괄 put을 날린 후 try-catch를 통해 실패가 발생하게 있다면 명시적으로 버퍼를 비우는 예제 코드입니다.

 

List<Put> puts = new ArrayList<>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
puts.add(put1)

Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("BOGUS"), Bytes.toBytes("qual1"), Bytes.toBytes("val2"));
puts.add(put2)

Put put3 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val3"));
puts.add(put3)

Put put4 = new Put(Bytes.toBytes("row2"));
puts.add(put4)
try {
    hTable.put(puts);
} catch (Exception e) {
    hTable.flushCommits();    
}

 

추가로, 리스트 기반의 입력시에는 Hbase 서버에서 입력 연산의 순서가 보장되지 않습니다.

 

 

5. 원자적 확인 후 입력 연산

 

Hbase에서는 원자적 확인 후 입력이라는 특별한 기능을 제공합니다.

 

이 기능은 특정한 조건에 만족하는 경우 put 연산을 수행할 수 있도록합니다.

반환값으로는 boolean 값으로 put 연산이 수행되었는지의 여부를 의미합니다.

 

사용법으로는 아래와 같습니다.

최신 버전에서는 deprecated 되어 있습니다.

 

boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException 
boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException
boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException

 

 

2) Get 메서드

 

Hbase 클라이언트는 데이터를 읽어오는 Get 메서드를 제공합니다.

 

 

1. 단일 Get

 

특정 값을 반환받는데 사용하는 메서드입니다.

 

Result get(Get get) throws IOException

 

put과 유사하게 get 메서드도 전용 Get 클래스를 인자로 받고 있습니다.

 

Get 클래스의 생성자 메서드는 아래와 같습니다.

 

Get(byte[] row)
Get(byte[] row, RowLock rowLock)

 

아래는 한 로우에대해서 읽는 데이터 범위를 줄이기위해 필요한 보조적인 메서드들입니다.

 

Get addFamily(byte[] family)
Get addColumn(byte[] family, byte[] qualifier)
Get setTimeRange(long minStamp, long maxStamp) throws IOException
Get setTimeStamp(long timestamp)
Get setMaxVersions()
Get setMaxVersions(int maxVersions) throws IOException

 

읽어올때는 위에서 소개한 Bytes 헬퍼 클래스를 통해 byte[]을 원하는 데이터 타입으로 변환이 가능합니다.

 

static String toString(byte[] b)
static boolean toBoolean(byte[] b)
static long toLong(byte[] b)
static float toFloat(byte[] b)
static int toInt(byte[] b)

 

 

 

 

반응형

 

 

 

 

 

2. Result 클래스

 

get 메서드의 반환 타입은 Result 클래스입니다.

 

Result 클래스는 Hbase 데이터인 컬럼패밀리, 퀄리파이어, 타임스탬프, 값 등을 모두 가지고 있으며 내부 메서드로 제공하고 있습니다.

 

byte[] getValue(byte[] family, byte[] qualifier) // 특정 셀 값
byte[] value() // 사전 편찬식으로 정렬된 KeyValue 중에 첫번째 값
byte[] getRow() // 로우키
int size() // KeyValue 갯수
boolean isEmpty() // KeyValue 존재 여부
KeyValue[] raw() // KeyValue 접근을 위한 메서드
List<KeyValue> list() // raw에서 반환되는 KeyValue 배열을 단순히 list 형식으로 바꿔서 반환하는 메서드

 

아래는 부가적으로 컬럼단위로 제공하는 메서드입니다.

 

List<KeyValue> getColumn(byte[] family, byte[] qualifier)
KeyValue getColumnLatest(byte[] family, byte[] qualifier)
boolean containsColumn(byte[] family, byte[] qualifier)

 

아래는 Result의 데이터에 접근을 용이하게 제공하는 Map 지향적인 메서드입니다.

 

NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> getMap()
NavigableMap<byte[], NavigableMap<byte[], byte[]>> getNoVersionMap() // 최근 값만을 반환
NavigableMap<byte[], byte[]> getFamilyMap(byte [] family)

 

위와 같은 메서드는 상황에 따라 적절히 선택하여 사용하면 되며,

이미 Result 인스턴스는 서버에서 클라이언트로 데이터를 받았기 때문에 어떤 메서드를 선택하든지 성능에 이슈는 없습니다.

 

 

3. Get 리스트

 

get은 단일 결과가 아닌 멀티개의 결과도 읽을 수 있도록 제공합니다.

 

Result[] get(List<Get> gets) throws IOException

 

put 리스트와 차이점으로는 예외 발생시 입니다.

get의 경우에는 List<Get>의 동일한 크기의 배열을 반환받는것과 아예 예외를 반환받는 둘 중에 한가지로 동작이 됩니다.

 

 

4. Get 관련 기타 메서드

 

단순히 데이터를 읽어오는 메서드말고도 제공하는 메서드에 대해서 알아 보겠습니다.

 

boolean exists(Get get) throws IOException
Result getRowOrBefore(byte[] row, byte[] family)

 

exists 의 경우에는 메서드명 그대로 지정한 로우가 있는지 확인하기 위한 메서드입니다.

 

getRowOrBefore의 경우에는 지정한 로우가 있으면 반환하지만 없으면 바로 그 이전 로우를 반환합니다.

이전 로우도 없다면 null를 반환합니다.

Hbase는 로우키 기준으로 사전편찬식으로 정렬되어 저장되기 때문에 이러한 기능이 가능합니다.

 

 

3) Delete 메서드

 

이번엔 CRUD중 D에 해당하는 delete 메서드에 대해 알아 보겠습니다.

 

 

1. 단일 Delete

void delete(Delete delete) throws IOException

 

put과 get과 마찬가지로 Delete 전용 클래스를 인자로 받습니다.

 

Delete 클래스의 생성자는 아래와 같습니다.

 

Delete(byte[] row)
Delete(byte[] row, long timestamp, RowLock rowLock)

 

추가로 삭제 범위를 줄일때에는 아래 메서드를 활용하면 됩니다.

 

Delete deleteFamily(byte[] family)
Delete deleteFamily(byte[] family, long timestamp)
Delete deleteColumns(byte[] family, byte[] qualifier)
Delete deleteColumns(byte[] family, byte[] qualifier, long timestamp)
Delete deleteColumn(byte[] family, byte[] qualifier) // 최근 버전만삭제 
Delete deleteColumn(byte[] family, byte[] qualifier, long timestamp) // 지정한 타임스탬프와 일치하는것만 삭제
void setTimestamp(long timestamp) // 전체 컬럼패밀리 대상으로 지정한 타임스탬프 이하인 값들을 삭제

 

2. Delete 리스트

 

delete도 위 put, get과 마찬가지로 단일 뿐만이 아니라 List<Delete>도 제공합니다.

 

void delete(List<Delete> deletes) throws IOException

 

Delete의 예외 방식에 대해 알아보겠습니다.

 

delete()는 인수로 전달했던 List<Delete> 에서 실패한 Delete 객체만이 남게됩니다.

결국, 모두 성공했을때는 인자로 받은 List<Delete>는 텅비게 되어집니다.

 

 

3. 원자적 확인 후 삭제 연산

 

delete에서도 put과 동일하게 원자적으로 확인 후 삭제 연산을 할 수 있도록 제공하고 있습니다.

 

boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value, final Delete delete) throws IOException
boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException
boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Delete delete) throws IOException

 

value 파라미터에 null을 넣어서 수행하게 되면 해당 컬럼 값이 존재하는지 여부를 검사합니다.

 

 

4. 일괄처리 연산

지금까지는 단일 또는 리스트 기반의 연산을 알아봤습니다.

 

이번에는 여러개의 로우에 대해서 다양한 연산을 일괄처리하는 API에 대해서 알아보겠습니다.

 

클라이언트는 이 일괄처리를 위해 Put, Get, Delete의 조상 클래스인 Row 클래스를 제공합니다.

 

메서드는 아래와 같습니다.

 

void batch(final List<? extends Row> actions, final Object[] results)

 

여러 Row 하위 클래스들을 인자로 받아 처리 후 2번째 인자로 받은 Objects 배열에 결과를 담습니다.

 

또 한가지 메서드가 있는데 명세는 아래와 같습니다.

 

Object[] batch(final List<? extends Row> actions) throws IOException, InterruptedException

 

이 메서드는 예외가 발생하면 throws가 되므로 결과값 배열에는 아무것도 담기지 않게됩니다.

 

위 두 batch 메서드 수행시에는 Put 인스턴스가 쓰기 버퍼에 저장되지 않습니다.

 

아래는 예제입니다.

 

private final static byte[] ROW1 = Bytes.toBytes("row1");
private final static byte[] ROW2 = Bytes.toBytes("row2");
private final static byte[] COLFAM1 = Bytes.toBytes("colfam1");
private final static byte[] COLFAM2 = Bytes.toBytes("colfam2");
private final static byte[] QUAL1 = Bytes.toBytes("qual1");
private final static byte[] QUAL2 = Bytes.toBytes("qual2");

public static void main(String[] args) {
    Configuration conf = HBaseConfiguration.create();
    HTable hTable = new HTable(conf, "testtable");
    
    List<Row> batch = new ArrayList<>();
    
    Put put = new Put(ROW2);
    put.addColumn(COLFAM2, QUAL1,  Bytes.toBytes("val5"));
    batch.add(put);
    
    Get get1 = new Get(ROW1);
    get1.addColumn(COLFAM1, QUAL1);
    batch.add(get1);
    
    Delete delete = new Delete(ROW1);
    delete.deleteColumns(COLFAM1, QUAL2);
    batch.add(delete);
    Get get2 = new Get(ROW2);
    get2.addFamily(Bytes.toBytes("BOGUS"));
    batch.add(get2);
    
    Object[] results = new Object[batch.size()];
    
    try {
        hTable.batch(batch, results);
    } catch (Exception e) {
        System.out.println("ERROR : " + e);
    }
    
    Arrays.stream(results).forEach(item -> System.out.println("Result : " + item));
}

 

5. 로우 락

Hbase의 경우 각 로우에 대해 원자성을 보장하여 순차적으로 처리합니다.

그렇기 때문에 위에서 알아본 put, delete, checkAndPut 등도 모두 원자성이 보장되면서 수행이 되어집니다.

 

한 예로 Put 인스턴스를 생성시에 RowLock을 부여하지 않는다면 서버측에서는 해당 로우에 대해서 

메서드가 수행되는 동안에만 유지되는 락을 생성하여 적용하게 됩니다.

 

만약, 한 로우에 대해서 클라이언트측이 커스텀하게 락을 획득 및 해제를 하고 싶은 경우에는 아래 메서드를 이용하면 됩니다.

단, 잘못하면 다른 연산 메서드에 영향이 갈 수 있으니 신중히 사용해야 합니다.

 

RowLock lockRow(byte[] row) throws IOException
void unlockRow(RowLock rl) throws IOException

 

락은 다른 수행에 영향이 끼칠 수 있으니 무기한으로 설정되어 있지는 않습니다.

hbase-site.xml에 hbase.resionserver.lease.period 에 값만큼이 락의 유효 기간입니다.

디폴트 1분입니다.

 

6. 스캔

Hbase는 범위 탐색인 스캔 기능을 제공합니다.

스캔은 Hbase가 제공하는 순차적이고 정렬된 저장구조를 활용합니다.

 

1) 소개

 

Hbase 클라이언트는 Scan 이라는 별도의 클래스를 제공하고 있습니다.

 

아래는 사용 메서드입니다.

 

ResultScanner getScanner(Scan scan)
ResultScanner getScanner(byte [] family)
ResultScanner getScanner(byte [] family, byte [] qualifier)

 

Scan 인자를 주면 내부적으로 스캔한 결과를 담은 ResultScanner를 반환합니다.

2,3번째 메서드는 내부적으로 Scan을 만들어서 동작 후 결과를 반환합니다.

 

아래는 Scan의 생성자입니다.

 

Scan()
Scan(byte[] startRow, Filter filter)
Scan(byte[] startRow)
Scan(byte[] startRow, byte[] stopRow)

 

생성자에서 알 수 있듯이 Scan은 꼭 정확한 로우를 몰라도 탐색이 가능합니다.

startRow만 주게된다면 startRow보다 같거나 큰 로우들부터 탐색을하게 됩니다.

stopRow는 탐색이 해당 stopRow보다 크거나 같은 로우를 만나면 끝나게 됩니다.

 

Filter는 말그대로 탐색 중 사용자가 지정한 데이터만을 얻기위해서 있는 인자입니다.

 

인자가 없는 기본 생성자로도 수행은 되며, 이경우에는 모든 컬럼패밀리 및 그에 속한 컬럼을 포함한 전체 테이블을 스캔합니다.

 

아래는 위 CRUD의 메서드들과 같이 탐색의 범위를 줄이기 위한 메서드입니다.

 

Scan addFamily(byte [] family)
Scan addColumn(byte [] family, byte [] qualifier)
Scan setTimeRange(long minStamp, long maxStamp) throws IOException
Scan setTimeStamp(long timestamp)
Scan setMaxVersions()
Scan setMaxVersions(int maxVersions)

 

그 외에도 Scan 클래스의 경우에는 getter/setter가 있어 언제든지 활용이 가능합니다.

 

 

2) ResultScanner 클래스

 

스캔의 경우에는 한번의 RPC로 로우들을 클라이언트한테 반환하지 않습니다.

이유는, 방대한 로우가 탐색될 수도 있어 한번의 RPC로는 오버헤드가 발생할 수 있기때문입니다.

 

때문에, ResultScanner는 Get과 비슷한 연산으로 변환 후 Row에 대한 Result 인스턴스를 이터레이트할 수 있도록 감싼 형태입니다.

 

아래는 ResultScanner의 메서드입니다.

 

Result next() throws IOException // 한번에 한 Result 반환
Result[] next(int nbRows) throws IOException // nbRows 만큼의 Result 반환
void close()

 

스캐너의 경우 서버 측 리소스를 꽤 사용하기 때문에 힙 공간을 낭비하게 됩니다.

때문에, 꼭 작업이 끝난다음에는 close를 명시적으로 호출하여 자원을 반환하게 해야합니다.

 

 

아래는 스캐너 사용 예제입니다.

 

public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    HTable hTable = new HTable(conf, "testtable");

    Scan scan1 = new Scan();
    ResultScanner scanner1 = hTable.getScanner(scan1);
    for(Result res: scanner1) {
        System.out.println(res);
    }
    scanner1.close();

    Scan scan2 = new Scan();
    scan2.addFamily(Bytes.toBytes("colfam1"));
    ResultScanner scanner2 = hTable.getScanner(scan2);
    for(Result res: scanner2) {
        System.out.println(res);
    }
    scanner2.close();

    Scan scan3 = new Scan();
    scan3.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"))
            .addColumn(Bytes.toBytes("colfam2"), Bytes.toBytes("col-33"))
            .setStartRow(Bytes.toBytes("row-10"))
            .setStopRow(Bytes.toBytes("row-20"));
    ResultScanner scanner3 = hTable.getScanner(scan3);
    for(Result res: scanner3) {
        System.out.println(res);
    }
    scanner3.close();
}

 

3) 캐싱 대 일괄처리

 

스캔의 경우 next메서드가 호출될때 마다 RPC가 발생합니다.

이는, 데이터 크기가 작은 경우에는 오히려 성능상으로 안좋을 수 있습니다.

 

때문에, Hbase에서는 스캐너 캐싱이라는 기능을 제공합니다.

 

이 캐싱 기능은 아래와 같이 2개의 다른 수중에서 활성화할 수 있습니다.

 

  • 테이블 단위의 캐싱
  • 스캔 단위의 캐싱

 

1. 테이블 단위 캐싱

 

테이블 단위의 캐싱을 사용할 때는 아래 메서드를 사용하면 됩니다.

 

void setScannerCaching(int scannerCaching)
int getScannerCaching()

 

아래는 스캔 단위의 캐싱 메서드입니다.

 

void setCaching(int caching)
int getCaching()

 

이러한 캐싱의 효과로는 RPC가 반환하는 로우 갯수를 사용자가 제어할 수 있다는 점입니다.

무조건 캐싱양을 늘리기보다는 클라이언트와 서버의 스펙을 고려하여 최적점을 찾아야 합니다.
무제한으로 올리게 된다면 최악에는 OutOfMemoryException이 발생하게 됩니다.

 

하지만 예기치 못하게 하나의 로우가 매우 커서 메모리 이슈가 발생할 수 도 있습니다.

 

Hbase에서는 이를 위해 일괄처리 기능을 제공합니다.

 

void setBatch(int batch)
int getBatch()

 

일괄처리는 캐싱과 다르게 로우단위가 아닌 컬럼단위로 동작합니다.

한마디로 next 메서드 호출 시 반환되는 컬럼갯수를 제어합니다.

 

예를들어, setBatch(5)로 세팅을 하게된다면 Result 인스턴스마다 다섯개의 컬럼이 반환되어집니다.

 

만약 로우의 컬럼이 17개이고 일괄처리를 5로 설정했을때는 하나의 로우를 조각내어
5->5->5->2 개로 반환받게 됩니다.

 

처음에는 이 2개를 고려하지 않아도 되지만,

점차 서비스가 커지면서 최적화를 해야한다면 위의 스캐너 캐싱과 일괄처리를 적절히 조합하여 사용해야합니다.

 

 

7. 마무리

 

이번 포스팅에서는 클라이언트 API : 기본 기능에 대해 진행하였습니다.

 

다음 포스팅에서는 챕터 4장인 클라이언트 API : 고급 기능에 대해 진행하겠습니다.

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(2) 설치  (0) 2020.04.08
(1) 소개  (0) 2020.04.07
반응형

1. 서론

 

이번 포스팅에서는 Hadoop 카테고리에 있는 Hadoop 설치에서 Hbase를 추가하는 과정을 진행하겠습니다.

 

2. 설치

1) Hbase 서비스 추가

 

먼저 설치되어 있는 클라우데라 매니저 UI에 접속하여 아래와 같이 서비스 추가를 클릭합니다.

 

 

클릭하게 되면 아래와 같이 추가할 수 있는 서비스 목록 화면으로 넘어가게 됩니다.

Hbase를 선택해 주세요.

 

 

2) 서버 선택

 

Hbase를 선택하고 완료 버튼을 누르게 되면 마스터, 리전, 쓰리프트, Rest 용도의 서비스를 어느 서버에 추가할 지 선택하는 화면이 나옵니다.

 

 

저의 경우, 포스팅을 위해 사전에 미리 설치하여 위와 같이 빨간색으로 뜨고 있습니다.

저는 총 1~4번 서버 중에  Master는 1번 리전서버는 2~4번 서버를 선택하였습니다.

Rest 서버와 쓰리프트 서버는 별도로 등록하진 않았습니다. 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3) Hbase 설치 완료

 

이제 계속 버튼을 눌러 설치를 진행하게되면 Hbase가 설치가 됩니다.

 

이런 편리함 때문에, 클라우데라와 같은 매니저 역할의 상용 서비스를 사용하는것이 편합니다.

 

 

4) 기타 설정

 

설치는 완료 되었지만 Hbase에는 느낌표와 빨간 경고표시가 많이 보이게 될 것입니다.

 

이 부분은 Hadoop 설치 부분에서도 언급했었던, heap 사이즈를 root로 잡고 있기 때문입니다.

 

root의 디스크 공간이 충분하다면 안보이실 수 있습니다.

 

heap의 경우에는 아래와 같이 경로를 변경해 줍니다.

저의 경우에는 /home1/irteam/tmp로 변경하였습니다.

 

 

추가로 hbase관련 log를 적재하는 디렉터리도 root가 부족하다면 변경해야 합니다.

이것은 직접 서버에 들어가 아래와 같이 수정해주시면 됩니다.

 

sudo mkdir -p /home1/irteam/var/log/hbase
sudo mv /var/log/hbase/* /home1/irteam/var/log/hbase
sudo rm -rf /var/log/hbase
sudo ln -s /home1/irteam/var/log/hbase /var/log/hbase
sudo chmod -R 777 /home1/irteam

 

저의 경우에는 기본 경로인 /var/log/hbase에는 디스크가 충분하지 않아,

충분한 /home1/irteam/var/log/hbase 로 데이터를 옮긴 후 심볼릭 링크를 걸었습니다.

 

 

5) 재시작

 

이젠 heap 경로의 설정이 변경된것을 반영하기 위해 Hbase를 아래와 같이 재시작해줍니다.

 

3. 설치 확인

설치가 제대로 되었는지 확인하기 위해 서버에 접속하여 hbase 쉘에 접속 테스트를 진행해 봅니다.

 

아래와 같이 hbase shell 명령어를 통해 접속이 가능합니다.

 

간단히 클러스터의 상태를 확인하는 status 명령어를 수행하여 정상적으로 돌고 있는지 확인합니다.

 

위에서 설명한것과 같이 1개의 마스터와 3개의 리전서버가 active 한것을 볼 수 있습니다.

 

4. 마무리

 

이번 포스팅에서는 Hbase 설치를 진행해봤습니다.

 

다음 포스팅에서는 챕터 3장인 클라이언트 API:기본 기능에 대해 진행하도록 하겠습니다.

 

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(1) 소개  (0) 2020.04.07

+ Recent posts