이번 포스팅에서는 Hbase가 제공하는 클라이언트 API 중 고급 기능에 대해 알아보겠습니다.
2. 필터
Hbase에서는 Get 말고도 필터를 이용하여 데이터를 조회하여 가져올 수 있습니다.
1) 필터 소개
Hbase 클라이언트는 Filter라는 추상클래스와 여러가지 구현클래스를 제공하고 있습니다.
또한, 개발자는 직접 Filter 추상 클래스를 구현하여 사용할 수 있습니다.
모든 필터는 실제로 서버측에 적용이 되어 수행되어 집니다.
클라이언트에서 적용되는 경우에는 많은 데이터를 가져와 필터링해야 하기 때문에 대규모 환경에서는 적합하지 않습니다.
아래는 필터가 실제로 어떻게 적용되는지 보여줍니다.
클라이언트에서 필터 생성 -> RPC로 직렬화한 필터 서버로 전송 -> 서버에서 역직렬화하여 사용
2) 필터의 계층 구조
Filter 추상클래스가 최상단이며 추상클래스를 상속받아 뼈대를 제공하는 추상클래스로 FilterBase가 있습니다.
Hbase에서 제공하는 구현 클래스들은 FilterBase를 상속하고 있는 형태입니다.
3) 비교 필터
Hbase가 제공하는 필터 중 비교연산을 지원하는 CompareFilter가 있습니다.
생성자 형식은 아래와 같습니다.
CompareFilter(final CompareOperator op, final ByteArrayComparable comparator)
CompareOperator 에는 [LESS, LESS_OR_EQUAL, EQUAL, NOT_EQUAL, GREATER_OR_EQUAL, GREATER, NO_OP] 가 있습니다.
ByteArrayComparable 는 추상 클래스로 compareTo 추상 메서드를 가지고 있습니다.
4) 로우 필터 - RowFilter
로우 필터는 로우 키 기반으로 데이터를 필터링 할 수 있도록 제공하고 있습니다.
아래는 예제 코드입니다.
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));
Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-0"));
Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
ResultScanner scanner1 = hTable.getScanner(scan);
scanner1.forEach(System.out::println);
scanner1.close();
Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*-.5"));
scan.setFilter(filter2);
ResultScanner scanner2 = hTable.getScanner(scan);
scanner2.forEach(System.out::println);
scanner2.close();
Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("-5"));
scan.setFilter(filter3);
ResultScanner scanner3 = hTable.getScanner(scan);
scanner3.forEach(System.out::println);
scanner3.close();
위 예제에서는 아래 필터들을 사용하는 것을 볼 수 있습니다.
filter1 : 지정한 로우키에 대해서 사전편찬식으로 저장되는 로우들을 이용하여 필터
filter2 : 정규표현식을 이용하여 필터
filter3 : 부분 문자열을 이용하여 필터.
5) 패밀리 필터 - FamilyFilter
패밀리 필터의 경우 로우 필터와 동작방식이 비슷하지만 로우 키가 아닌 로우 안의 컬럼패밀리를 대상으로 비교합니다.
아래는 예제 코드입니다.
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));
Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));
Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(System.out::println);
scanner.close();
Get get1 = new Get(Bytes.toBytes("row-5"));
get1.setFilter(filter1);
Result result1 = hTable.get(get1);
System.out.println("Result of get(): " + result1);
Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("colfam3")));
Get get2 = new Get(Bytes.toBytes("row-5"));
get2.addFamily(Bytes.toBytes("colfam1"));
get2.setFilter(filter2);
Result result2 = hTable.get(get2);
System.out.println("Result of get(): " + result2);
위 예제 볼 수 있듯이 Filter는 Get, Scan 두개에 setFilter 메서드를 통해 적용 가능합니다.
또한, 컬럼 패밀리도 사전 편찬식으로 저장되는것을 이용하는것을 볼 수 있습니다.
6) 퀄리파이어 필터 - QualifierFilter
퀄리파이어 필터는 말 그대로 퀄리파이어를 대상으로 비교하는 필터입니다.
아래는 예제 코드입니다.
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));
Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(System.out::println);
scanner.close();
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = hTable.get(get);
System.out.println("Result of get() : " + result);
7) 값 필터
이번에는 값을 대상으로 비교하는 필터입니다.
아래는 예제입니다.
Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));
Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4"));
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(result -> {
Arrays.asList(result.rawCells()).forEach(System.out::println);
});
scanner.close();
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = hTable.get(get);
Arrays.asList(result.rawCells()).forEach(System.out::println);
8) 의존 컬럼 필터 - DependentColumnFilter
의존 컬럼 필터의 경우 단순 필터링이 아닌 더 복잡한 필터 기능을 제공합니다.
이 필터는 다른 컬럼이 필터링될지 여부를 결정하는 의존 컬럼을 지정합니다.
아래는 의존 컬럼 필터의 생성자 입니다.
DependentColumnFilter(
final byte [] family,
final byte[] qualifier,
final boolean dropDependentColumn,
final CompareOperator op,
final ByteArrayComparable valueComparator
)
9) 단일 컬럼값 필터 - SingleColumnValueFilter
단일 컬럼값 필터는 특정 컬럼과 값에 대해 비교 필터로 사용합니다.
생성자는 아래와 같습니다.
SingleColumnValueFilter(
final byte [] family,
final byte [] qualifier,
final CompareOperator op,
final byte[] value
)
SingleColumnValueFilter(
final byte [] family,
final byte [] qualifier,
final CompareOperator op,
final ByteArrayComparable comparator
)
하지만, 제공하는 필터가 아닌 사용자가 만든 Custom 한 필터가 필요한 경우가 있습니다.
이런경우에는 Filter 혹은 FilterBase 추상 클래스를 상속받아 만들 수 있습니다.
반응형
3. 카운터
Hbase에서는 고급 기능인 카운터 기능도 제공하고 있습니다.
1) 카운터 소개
카운터 기능이 없다면 개발자는 수동으로 아래와 같은 절차를 만들어야 합니다.
로우 lock
값 조회
값 증가 후 write
lock 해제
하지만 이러한 절차는 과도한 경합상황을 야기하며,
lock이 잠긴상태로 어플리케이션이 죽게되면 lock 정책에 따른 타임아웃이 끝나기 전까지는 다른 어플리케이션인 접근할 수 없게 됩니다.
Hbase에서는 클라이언트 측 호출을 통해서 이러한 절차를 원자적으로 처리할 수 있도록 제공하고 있습니다.
더불어, 한 호출로 여러개의 카운터 갱신 기능도 제공하고 있습니다.
아래는 hbase shell 에서 카운터 예제를 수행한 사진입니다.
incr 은 주어진 인자 만큼 카운터의 값을 증가 시킨 후 반환 받습니다.
get_counter는 현재 카운터 값을 반환합니다.
아래는 카운터도 보통의 컬럼 중에 하나라는것을 증명하는 사진입니다.
추가로 incr의 경우 음수를 인자로 주어 카운터를 감소시킬수도 있습니다.
2) 단일 카운터
Hbase 클라이언트는 단일 카운터만을 대상으로 처리할 수 있도록 제공하고 있습니다.
이때, 정확한 카운터 컬럼을 지정해야 합니다.
아래는 HTable 클래스에서 제공하는 메서드입니다.
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)
카운터를 수행할 좌표값(row, family, qualifier) 와 카운터 증감 값을 인자로 받는것을 알 수 있습니다.
단, Durability 인자로 오버로딩이 되어 있습니다.
이 Durability 는 WAL에 반영을 어떻게 할건지에 대한 설정입니다.
Durability 는 enum으로 아래와 같은 값들이 정의되어 있습니다.
public enum Durability {
USE_DEFAULT,
SKIP_WAL,
ASYNC_WAL,
SYNC_WAL,
FSYNC_WAL
}
3) 복수 카운터
카운터를 증가시키는 방법으로는 HTable의 increment 메서드가 있습니다.
아래는 increment 메서드 명세입니다.
Result increment(final Increment increment)
이 메서드를 사용하기 위해서는 Increment 인스턴스를 만들어야 합니다.
이 Increment 인스턴스에는 카운터 컬럼의 좌표값과 적절한 세부 사항을 넣어야 합니다.
아래는 Increment의 생성자입니다.
Increment(byte [] row)
생성자로 로우키를 먼저 필수로 지정한 뒤 범위를 줄이고 싶을때는 아래와 같은 메서드로 가능합니다.
Increment addColumn(byte [] family, byte [] qualifier, long amount)
카운터의 경우 버전은 내부적으로 처리하기 때문에 인자에 timestamp를 받는 부분이 없습니다.
또한, addFamily 메서드도 없는데 그 이유는 카운터는 특정 컬럼이기 때문입니다.
추가로 시간 범위를 지정하여 읽기 연산의 이점을 얻는 메서드도 제공하고 있습니다.
Increment setTimeRange(long minStamp, long maxStamp)
public class PutExample {
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
HTable hTable = new HTable(conf, "testtable");
Put put = new Put(Bytes.toBytes("row1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));
hTable.put(put);
}
}
위의 Configuration은 org.apache.hadoop.conf 패키지에 존재하며 설정 정보를 주입하는 역할입니다.
기존 설정정보는 hbase-site.xml에 기입해야하지만, 동적으로 변경해야 할때는 Configuration를 사용하면 됩니다.
아래는 주키퍼 쿼럼 정보를 동적으로 Configuration를 사용하여 세팅하는 예제입니다.
부여하지 않을 시에는 로우가 저장소에 추가되는 순간에 해당 리전서버의 시각으로 자동 부여가 됩니다.
Hbase의 경우, 셀은 타임스탬프값을 기준으로 정렬되어져 저장됩니다.
2. KeyValue 클래스
코드상에서 KeyValue 인스턴스를 처리해야 하는 경우가 종종 있습니다.
때문에, KeyValue 클래스에 대해 간단히 살펴보겠습니다.
우선, KeyValue 클래스는 특정 셀의 정보를 가지고 있습니다.
특정 셀의 정보는 로우 키, 컬럼패밀리, 컬럼 퀄리파이어, 타임스탬프를 의미합니다.
메서드로는 아래와 같이 있습니다.
// 아래 3개는 KeyValue 인스턴스에 저장되어 있는 전체 바이트 배열에 관한 메서드입니다.
byte[] getBuffer()
int getOffset()
int getLength()
// 아래 2개는 로우키와 데이터가 저장된 워시 좌표 정보의 바이트 배열을 반환하는 메서드입니다.
byte[] getRow()
byte[] getKey()
또한, KeyValue 클래스는 내부적으로 Comparator를 통해 값에 대해서 커스텀하게 정렬을 할 수 있도록 제공합니다.
아래는 KeyValue 클래스에서 제공하는Comparator 종류입니다.
비교 연산자
설명
KeyComparator
KeyValue 2개의 키를 비교합니다. 즉, getKey 메서드를 통해 원시 바이트 배열을 비교합니다.
KVComparator
원시 형태의 KeyComparator를 감싼 형으로서, KeyValue 2개를 비교할 때 사용합니다.
RowComparator
getRow 로 얻은 값으로 비교합니다.
KeyValue 클래스는 type이라는 필드를 가지고 있습니다.
이 type은 KeyValue에 하나의 차원이 더 추가되는것과 같습니다.
type 에 사용가능한 값은 아래와 같습니다.
유형
설명
Put
해당 KeyValue 인스턴스가 일반적인 Put 연산임을 의미합니다.
Delete
해당 KeyValue 인스턴스가 일반적인 Delete 연산임을 의미합니다.
DeleteColumn
Delete와 같지만, 더 광범위하게 전체 컬럼을 삭제한다는 의미입니다.
DeleteFamily
Delete와 같지만, 더 광범위하게 전체 컬럼패밀리 및 그에 속한 모든 컬럼을 삭제한다는 의미입니다.
위와 같이 false로 설정된 후부터는 put 메서드를 호출하더라도 클라이언트측 메모리 버퍼에 저장되고 서버로는 실제 전송이 이루어 지지 않습니다.
서버로 RPC를 날리기 위해서는 아래와 같은 메서드를 호출하면 됩니다.
void flushCommits() throws IOException
쓰기 버퍼의 경우 클라이언트에서 내부적으로 알아서 비워주기 때문에 크게 고려하지 않아도 됩니다.
추가로, 쓰기 버퍼의 크기도 조정이 가능한데 방법은 아래와 같습니다.
long getWriteBufferSize()
long setWriteBufferSize(long writeBufferSize) throws IOException
물론, 명시적으로 버퍼를 비울수도 있습니다.
개발자가 작성한 코드로 인해서 버퍼가 비워지는 경우는 아래와 같습니다.
flushCommits 호출 시
autoFlush가 true의 경우 put 메서드 호출 시
setWriteBufferSize 호출 시
아래는 쓰기 버퍼를 사용한 예제 입니다.
public static void main(String[] args) {
Configuration conf = HBaseConfiguration.create();
HTable hTable = new HTable(conf, "testtable");
System.out.println("Auto flush: " + hTable.isAutoFlush());
hTable.setAutoFlush(false);
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
hTable.put(put1);
Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val2"));
hTable.put(put2);
Put put3 = new Put(Bytes.toBytes("row3"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val3"));
hTable.put(put3);
Get get = new Get(Bytes.toBytes("row1"));
Result res1 = hTable.get(get);
System.out.println("Result: " + res1);
hTable.flushCommits();
Result res2 = hTable.get(get);
System.out.println("Result: " + res2);
}
위 결과 출력은 아래와 같습니다.
Auto flush: true
Result: keyvalues=None
Result: keyvalues={ro1/colfam1/qual1/123412358/Put/vlen=4}
위에서 설명한것과 같이 버퍼에 쓰기만 하고 서버로 PRC를 하지 않았기 때문에
첫번째 Get에서는 None이 나온것을 볼 수 있습니다.
만약 버퍼에 있는 데이터를 보기 위해서는 아래 메서드를 사용하면 됩니다.
ArrayList<Put> getWriteBuffer()
하지만 이 방법은 멀티쓰레드 환경에서 조심해야 합니다.
이유로는 List에 접근 시 힙 크기를 확인하지 않고 접근하며, 또 버퍼 비우기가 진행되는 도중 다른 쓰레드가 값을 변경할 수 있기 때문입니다.
4. Put 리스트
클라이언트 API는 단일 put이 아닌 List<put>도 처리 가능하도록 제공합니다.
void put(List<Put> puts) throws IOException
위 메서드를 사용하면 List<Put>으로 데이터 적재가 가능합니다.
다만, List에 있는 모든 Put이 성공하지 않을 수 있습니다.
성공하지않은 Put이 있다면 클라이언트는 IOException을 받게됩니다.
하지만, Hbase는 List에 있는 put을 이터레이트돌며 적용하기 때문에 하나가 실패한다고 안에 있는것이 모두 실패하지는 않습니다.
실패한 Put은 쓰기버퍼에 남아있게 되고 다음 flush 작업에서 재수행하게 됩니다.
만약 데이터가 잘못되어 실패되는 케이스라면 계속 버퍼에 남게되어 재수행을 반복하게 될 것입니다.
이를 방지하기 위해서는 수동을 버퍼를 비워줘야 합니다.
아래는 일괄 put을 날린 후 try-catch를 통해 실패가 발생하게 있다면 명시적으로 버퍼를 비우는 예제 코드입니다.
List<Put> puts = new ArrayList<>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
puts.add(put1)
Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("BOGUS"), Bytes.toBytes("qual1"), Bytes.toBytes("val2"));
puts.add(put2)
Put put3 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val3"));
puts.add(put3)
Put put4 = new Put(Bytes.toBytes("row2"));
puts.add(put4)
try {
hTable.put(puts);
} catch (Exception e) {
hTable.flushCommits();
}
추가로, 리스트 기반의 입력시에는 Hbase 서버에서 입력 연산의 순서가 보장되지 않습니다.
5. 원자적 확인 후 입력 연산
Hbase에서는 원자적 확인 후 입력이라는 특별한 기능을 제공합니다.
이 기능은 특정한 조건에 만족하는 경우 put 연산을 수행할 수 있도록합니다.
반환값으로는 boolean 값으로 put 연산이 수행되었는지의 여부를 의미합니다.
사용법으로는 아래와 같습니다.
최신 버전에서는 deprecated 되어 있습니다.
boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException
boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException
boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException
2) Get 메서드
Hbase 클라이언트는 데이터를 읽어오는 Get 메서드를 제공합니다.
1. 단일 Get
특정 값을 반환받는데 사용하는 메서드입니다.
Result get(Get get) throws IOException
put과 유사하게 get 메서드도 전용 Get 클래스를 인자로 받고 있습니다.
Get 클래스의 생성자 메서드는 아래와 같습니다.
Get(byte[] row)
Get(byte[] row, RowLock rowLock)
아래는 한 로우에대해서 읽는 데이터 범위를 줄이기위해 필요한 보조적인 메서드들입니다.
Get addFamily(byte[] family)
Get addColumn(byte[] family, byte[] qualifier)
Get setTimeRange(long minStamp, long maxStamp) throws IOException
Get setTimeStamp(long timestamp)
Get setMaxVersions()
Get setMaxVersions(int maxVersions) throws IOException
읽어올때는 위에서 소개한 Bytes 헬퍼 클래스를 통해 byte[]을 원하는 데이터 타입으로 변환이 가능합니다.
static String toString(byte[] b)
static boolean toBoolean(byte[] b)
static long toLong(byte[] b)
static float toFloat(byte[] b)
static int toInt(byte[] b)
반응형
2. Result 클래스
get 메서드의 반환 타입은 Result 클래스입니다.
Result 클래스는 Hbase 데이터인 컬럼패밀리, 퀄리파이어, 타임스탬프, 값 등을 모두 가지고 있으며 내부 메서드로 제공하고 있습니다.
byte[] getValue(byte[] family, byte[] qualifier) // 특정 셀 값
byte[] value() // 사전 편찬식으로 정렬된 KeyValue 중에 첫번째 값
byte[] getRow() // 로우키
int size() // KeyValue 갯수
boolean isEmpty() // KeyValue 존재 여부
KeyValue[] raw() // KeyValue 접근을 위한 메서드
List<KeyValue> list() // raw에서 반환되는 KeyValue 배열을 단순히 list 형식으로 바꿔서 반환하는 메서드
delete()는 인수로 전달했던 List<Delete> 에서 실패한 Delete 객체만이 남게됩니다.
결국, 모두 성공했을때는 인자로 받은 List<Delete>는 텅비게 되어집니다.
3. 원자적 확인 후 삭제 연산
delete에서도 put과 동일하게 원자적으로 확인 후 삭제 연산을 할 수 있도록 제공하고 있습니다.
boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value, final Delete delete) throws IOException
boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException
boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Delete delete) throws IOException
value 파라미터에 null을 넣어서 수행하게 되면 해당 컬럼 값이 존재하는지 여부를 검사합니다.
4. 일괄처리 연산
지금까지는 단일 또는 리스트 기반의 연산을 알아봤습니다.
이번에는 여러개의 로우에 대해서 다양한 연산을 일괄처리하는 API에 대해서 알아보겠습니다.
클라이언트는 이 일괄처리를 위해 Put, Get, Delete의 조상 클래스인 Row 클래스를 제공합니다.
메서드는 아래와 같습니다.
void batch(final List<? extends Row> actions, final Object[] results)
여러 Row 하위 클래스들을 인자로 받아 처리 후 2번째 인자로 받은 Objects 배열에 결과를 담습니다.