반응형

1. 서론

저번 포스팅에서는 impala- mybatis 사용에 대해서 알아보았습니다.

 

그럼, impala는 SQL 지원이 되고 relation이 있으니 ORM 기술인 hibernate도 사용할 수 있지 않나? 라고 생각할 수 있습니다.

 

이 질문은 반은 맞고, 반은 틀리다고 할 수 있습니다.

이유는, hibernate 이념과 impala 이념이 다르기 때문입니다.

 

hibernate는 기본적으로 트랜잭션이 가능한 저장소 를 지원하며, 데이터의 CUD는 무조건 트랜잭션을 통해 이루어집니다.

 

하지만, impala는 하나의 데이터 저장소가 아닌 hdfs, kudu, hbase와 같이 여러 저장소를 지원하는 SQL 인터페이스이며,

가장 중요한 트랜잭션이 없습니다. 게다가 hdfs의 경우에는 delete, update는 되지도 않습니다.

 

하둡의 hbase, kudu 등등은 저장소라고 볼 수 있지만, 다른 dbms들과 같이 트랜잭션개념은 없습니다.
이런 저장소 프로젝트들은 자체 클라이언트를 제공하고 있으며 실패에 대한 처리는 개발자의 몫으로 넘깁니다.

물론, atomic 특성을 제공하는 저장소도 있지만 이것은 트랜잭션이랑은 별개의 내용입니다.

 

하지만, 조회의 경우에는 말이 달라집니다. 일반적으로 조회는 트랜잭션이 필요없습니다.

게다가, 제가 일하는 조직에서는 DB의 실시간 백업용으로 ROS용 Kudu를 사용하고 있습니다.

 

저장소 자체가 ROS이기에 어플리케이션 단에서는 조회만을 하게 되고, hibernate의 엔티티 관계에 대한 이점 을 누릴 수 있습니다.

 

2. 문제점

하지만, Dialect 관련 문제가 하나 있었습니다.

 

hibernate는 기본적으로 Dialect(= 방언)를 설정하지 않으면,

jdbc를 통해 알맞은 데이터베이스가 무엇이고 데이터베이스에 맞는 Dialect을 찾아 세팅해줍니다.

 

jdbc 로부터 데이터베이스를 찾아 Dialect을 세팅하는 과정은 아래와 같습니다.

 

1) DialectResolverInitiator

 

hibernate에서는 Dialect 결정을 위한 서비스 클래스로 DialectResolverInitiator 클래스의 initiateService 메서드를 호출합니다.

 

아래는 DialectResolverInitiator 클래스의 initiateService 메서드입니다.

 

@Override
public DialectResolver initiateService(Map configurationValues, ServiceRegistryImplementor registry) {
	final DialectResolverSet resolver = new DialectResolverSet();

	applyCustomerResolvers( resolver, registry, configurationValues );
	resolver.addResolver( new StandardDialectResolver() );
    
	return resolver;
}

 

코드에서 보이는것과 같이 customResolver 제외하고, 기본 Resolver로 StandardDialectResolver 를 사용합니다.

 

 

2) StandardDialectResolver

 

아래는 StandardDialectResolver 클래스 코드입니다.

hibernate-core에 있는 Database enum 클래스를 iterate돌며 jdbc에 알맞은 dialect를 찾는 로직입니다.

public final class StandardDialectResolver implements DialectResolver {

	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {

		for ( Database database : Database.values() ) {
			Dialect dialect = database.resolveDialect( info );
			if ( dialect != null ) {
				return dialect;
			}
		}

		return null;
	}
}

 

3) DataBase

 

아래는 DataBase enum 클래스의 일부 코드입니다.

코드에서 보이듯이 각 Database에 대해서 정의가 되어 있으며, 인자로 받은 DialectResolutionInfo info 를 통해 맞는 Database인지 확인하는 로직이 있습니다.

 

DialectResolutionInfo 에서 사용하는 getDatabaseName, getDatabaseMajorVersion 등의 메서드는 jdbc의 DatabaseMetaData interface 구현체 메서드를 사용하도록 되어있습니다.

 

ORACLE {
	@Override
	public Class<? extends Dialect> latestDialect() {return Oracle12cDialect.class;}

	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		final String databaseName = info.getDatabaseName();

		if ( "Oracle".equals( databaseName ) ) {
			final int majorVersion = info.getDatabaseMajorVersion();

			switch ( majorVersion ) {
				case 12:
					return new Oracle12cDialect();
				case 11:
					// fall through
				case 10:
					return new Oracle10gDialect();
				case 9:
					return new Oracle9iDialect();
				case 8:
					return new Oracle8iDialect();
				default:
					return latestDialectInstance( this );
			}
		}

		return null;
	}
},
POINTBASE {
	@Override
	public Class<? extends Dialect> latestDialect() {
		return PointbaseDialect.class;
	}
	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		return null;
	}
},
POSTGRESQL {
	@Override
	public Class<? extends Dialect> latestDialect() {
		return PostgreSQL10Dialect.class;
	}
	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		final String databaseName = info.getDatabaseName();
		if ( "PostgreSQL".equals( databaseName ) ) {
			final int majorVersion = info.getDatabaseMajorVersion();
			final int minorVersion = info.getDatabaseMinorVersion();
			if ( majorVersion < 8 ) {
				return new PostgreSQL81Dialect();
			}
			if ( majorVersion == 8 ) {
				return minorVersion >= 2 ? new PostgreSQL82Dialect() : new PostgreSQL81Dialect();
			}
			if ( majorVersion == 9 ) {
				if ( minorVersion < 2 ) {
					return new PostgreSQL9Dialect();
				}
				else if ( minorVersion < 4 ) {
					return new PostgreSQL92Dialect();
				}
				else if ( minorVersion < 5 ) {
					return new PostgreSQL94Dialect();
				}
				else {
					return new PostgreSQL95Dialect();
				}
			}
			return latestDialectInstance( this );
		}
		return null;
	}
},

 


하지만 서론에서 말씀드린것과 같이 hibernate 이념과 impala는 맞지 않아,

Database enum 클래스에는 IMPALA 가 없을 뿐더러 ImpalaDialect도 존재하지 않습니다.

위에 보여드린 3개 클래스는 모두 hibernate-core lib에 있습니다.

 

 

반응형

3. 해결법

1) Hibernate PR

 

위 이슈와 관련해서, hibernate 쪽에 PR을 올려둔 상태입니다.

하지만, hibernate의 이념과 맞지 않고, 너무 특정케이스를 위해서이기 때문에 PR에 대한 approve가 될지는 모르겠습니다.....ㅠ

소스 변경이 아닌 추가여서, 장애 포인트는 없지만 이념과 다르다는 피드백을 받을거 같네요....ㅎㅎ

 

2) Custom resolver, database, dialect 등록

 

그래도 사용은 할 수 있습니다.

바로 위에서 살펴본 DialectResolverInitiator 클래스에 customResolver를 등록하는 코드가 있기 때문입니다!!

 

혹시나, 저와 같이 필요한 경우가 있다면 아래와 같이 커스텀한 resolver, database, dialect를 사용하여

조회용도의 impala-hibernate 조합을 사용할 수 있습니다.

 

1. Custom Resolver

 

커스텀한 Resolver를 만들어줍니다.

 

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class CustomDialectResolver implements DialectResolver {

    @Override
    public Dialect resolveDialect(DialectResolutionInfo info) {
        for ( ImpalaDatabase database : ImpalaDatabase.values() ) {
            Dialect dialect = database.resolveDialect( info );
            if ( dialect != null ) {
                return dialect;
            }
        }
        return null;
    }
}

 

2. Custom Database 

 

Resolver에서 사용하는 Database enum 클래스를 만들어 줍니다.

코드에서 알 수 있듯이, ImpalaJdbc는 getDatabaseName 메서드로 Impala 를 반환합니다.

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public enum ImpalaDatabase {

    IMPALA {
        @Override
        public Class<? extends Dialect> latestDialect() {
            return ImpalaDialect.class;
        }

        @Override
        public Dialect resolveDialect(DialectResolutionInfo info) {
            final String databaseName = info.getDatabaseName();
            if ("Impala".equals(databaseName)) {
                return latestDialectInstance(this);
            }
            return null;
        }
    };

    public abstract Class<? extends Dialect> latestDialect();

    public abstract Dialect resolveDialect(DialectResolutionInfo info);

    private static Dialect latestDialectInstance(ImpalaDatabase database) {
        try {
            return database.latestDialect().newInstance();
        }
        catch (InstantiationException | IllegalAccessException e) {
            throw new HibernateException( e );
        }
    }
}

 

3. Custom Dialect

 

Database에서 반환할 Dialect를 만듭니다.

해당 Dialect은 Cloudera Document - SQL Reference를 참고하여 만들었습니다.

모든 function, dml, ddl에 대해서 확인하는데에 한계가 있어 틀린부분이 있을 수 있지만
애초에 Custom Dialect이기 때문에 해당 부분은 fix하여 사용하면 됩니다.

 

import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.sql.Types;
import org.hibernate.MappingException;
import org.hibernate.cfg.Environment;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.function.StandardSQLFunction;
import org.hibernate.dialect.pagination.AbstractLimitHandler;
import org.hibernate.dialect.pagination.LimitHandler;
import org.hibernate.dialect.pagination.LimitHelper;
import org.hibernate.engine.jdbc.env.spi.NameQualifierSupport;
import org.hibernate.engine.spi.RowSelection;
import org.hibernate.tool.schema.extract.internal.SequenceInformationExtractorHSQLDBDatabaseImpl;
import org.hibernate.tool.schema.extract.spi.SequenceInformationExtractor;
import org.hibernate.type.StandardBasicTypes;

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class ImpalaDialect extends Dialect {

    private final class ImpalaLimitHandler extends AbstractLimitHandler {
        @Override
        public String processSql(String sql, RowSelection selection) {
            final boolean hasOffset = LimitHelper.hasFirstRow( selection );
            return sql + (hasOffset ? " limit ? offset ?" : " limit ?");
        }

        @Override
        public boolean supportsLimit() {
            return true;
        }

        @Override
        public boolean bindLimitParametersFirst() {
            return true;
        }
    }

    private final LimitHandler limitHandler;


    /**
     * Constructs a ImpalaDialect
     */
    public ImpalaDialect() {
        super();

        registerColumnType( Types.BIGINT, "bigint" );
        registerColumnType( Types.BOOLEAN, "boolean" );
        registerColumnType( Types.CHAR, "char($l)" );
        registerColumnType( Types.DECIMAL, "decimal($p,$s)" );
        registerColumnType( Types.DOUBLE, "double" );
        registerColumnType( Types.FLOAT, "float" );
        registerColumnType( Types.INTEGER, "int" );
        registerColumnType( Types.SMALLINT, "smallint" );
        registerColumnType( Types.VARCHAR, "string" );
        registerColumnType( Types.BLOB, "string" );
        registerColumnType( Types.CLOB, "string" );
        registerColumnType( Types.TINYINT, "tinyint" );
        registerColumnType( Types.TIMESTAMP, "timestamp" );

        // Impala Mathematical Functions
        registerFunction( "abs", new StandardSQLFunction("abs") );
        registerFunction( "acos", new StandardSQLFunction( "acos", StandardBasicTypes.DOUBLE ) );
        registerFunction( "asin", new StandardSQLFunction( "asin", StandardBasicTypes.DOUBLE ) );
        registerFunction( "atan", new StandardSQLFunction("atan", StandardBasicTypes.DOUBLE) );
        registerFunction( "atan2", new StandardSQLFunction("atan2", StandardBasicTypes.DOUBLE) );
        registerFunction( "bin", new StandardSQLFunction( "bin", StandardBasicTypes.STRING ) );
        registerFunction( "ceil", new StandardSQLFunction("ceil") );
        registerFunction( "dceil", new StandardSQLFunction("dceil") );
        registerFunction( "ceiling", new StandardSQLFunction("ceiling") );
        registerFunction( "conv", new StandardSQLFunction("conv") );
        registerFunction( "cos", new StandardSQLFunction( "cos", StandardBasicTypes.DOUBLE ) );
        registerFunction( "cot", new StandardSQLFunction( "cot", StandardBasicTypes.DOUBLE ) );
        registerFunction( "cosh", new StandardSQLFunction( "cosh", StandardBasicTypes.DOUBLE ) );
        registerFunction( "degrees", new StandardSQLFunction( "degrees", StandardBasicTypes.DOUBLE ) );
        registerFunction( "e", new StandardSQLFunction( "e", StandardBasicTypes.DOUBLE ) );
        registerFunction( "exp", new StandardSQLFunction( "exp", StandardBasicTypes.DOUBLE ) );
        registerFunction( "factorial", new StandardSQLFunction( "factorial", StandardBasicTypes.INTEGER) );
        registerFunction( "floor", new StandardSQLFunction( "floor", StandardBasicTypes.INTEGER ) );
        registerFunction( "dfloor", new StandardSQLFunction( "dfloor", StandardBasicTypes.INTEGER ) );
        registerFunction( "fmod", new StandardSQLFunction("fmod") );
        registerFunction( "fnv_hash", new StandardSQLFunction( "fnv_hash", StandardBasicTypes.INTEGER ) );
        registerFunction( "greatest", new StandardSQLFunction("greatest") );
        registerFunction( "hex", new StandardSQLFunction( "hex", StandardBasicTypes.STRING ) );
        registerFunction( "is_inf", new StandardSQLFunction( "is_inf", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "is_nan", new StandardSQLFunction( "is_nan", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "least", new StandardSQLFunction("least") );
        registerFunction( "ln", new StandardSQLFunction( "ln", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log", new StandardSQLFunction( "log", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log10", new StandardSQLFunction( "log10", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log2", new StandardSQLFunction( "log2", StandardBasicTypes.DOUBLE ) );
        registerFunction( "max_int", new StandardSQLFunction( "max_int", StandardBasicTypes.INTEGER ) );
        registerFunction( "max_bigint", new StandardSQLFunction( "max_bigint", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "max_smallint", new StandardSQLFunction( "max_smallint", StandardBasicTypes.INTEGER ) );
        registerFunction( "max_tinyint", new StandardSQLFunction( "max_tinyint", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_int", new StandardSQLFunction( "min_int", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_bigint", new StandardSQLFunction( "min_bigint", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "min_smallint", new StandardSQLFunction( "min_smallint", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_tinyint", new StandardSQLFunction( "min_tinyint", StandardBasicTypes.INTEGER ) );
        registerFunction( "mod", new StandardSQLFunction( "mod" ) );
        registerFunction( "murmur_hash", new StandardSQLFunction( "murmur_hash", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "negative", new StandardSQLFunction( "negative" ) );
        registerFunction( "pi", new StandardSQLFunction( "pi", StandardBasicTypes.DOUBLE ) );
        registerFunction( "pmod", new StandardSQLFunction( "pmod" ) );
        registerFunction( "positive", new StandardSQLFunction( "positive" ) );
        registerFunction( "pow", new StandardSQLFunction( "pow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "power", new StandardSQLFunction( "power", StandardBasicTypes.DOUBLE ) );
        registerFunction( "dpow", new StandardSQLFunction( "dpow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "fpow", new StandardSQLFunction( "fpow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "precision", new StandardSQLFunction( "precision", StandardBasicTypes.INTEGER ) );
        registerFunction( "quotient", new StandardSQLFunction( "quotient", StandardBasicTypes.INTEGER ) );
        registerFunction( "radians", new StandardSQLFunction( "radians", StandardBasicTypes.DOUBLE ) );
        registerFunction( "rand", new StandardSQLFunction( "rand", StandardBasicTypes.DOUBLE ) );
        registerFunction( "random", new StandardSQLFunction( "random", StandardBasicTypes.DOUBLE ) );
        registerFunction( "round", new StandardSQLFunction( "round" ) );
        registerFunction( "dround", new StandardSQLFunction( "dround" ) );
        registerFunction( "scale", new StandardSQLFunction( "scale", StandardBasicTypes.INTEGER ) );
        registerFunction( "sign", new StandardSQLFunction( "sign", StandardBasicTypes.INTEGER ) );
        registerFunction( "sin", new StandardSQLFunction( "sin", StandardBasicTypes.DOUBLE ) );
        registerFunction( "sinh", new StandardSQLFunction( "sinh", StandardBasicTypes.DOUBLE ) );
        registerFunction( "sqrt", new StandardSQLFunction( "sqrt", StandardBasicTypes.DOUBLE ) );
        registerFunction( "tan", new StandardSQLFunction( "tan", StandardBasicTypes.DOUBLE ) );
        registerFunction( "tanh", new StandardSQLFunction("tanh", StandardBasicTypes.DOUBLE) );
        registerFunction( "truncate", new StandardSQLFunction( "truncate" ) );
        registerFunction( "dtrunc", new StandardSQLFunction( "dtrunc" ) );
        registerFunction( "trunc", new StandardSQLFunction( "trunc" ) );
        registerFunction( "unhex", new StandardSQLFunction( "unhex", StandardBasicTypes.STRING ) );
        registerFunction( "width_bucket", new StandardSQLFunction( "width_bucket" ) );

        // Impala Bit Functions
        registerFunction( "bitand", new StandardSQLFunction( "bitand" ) );
        registerFunction( "bitor", new StandardSQLFunction( "bitor" ) );
        registerFunction( "bitnot", new StandardSQLFunction( "bitnot" ) );
        registerFunction( "bitxor", new StandardSQLFunction( "bitxor" ) );
        registerFunction( "countset", new StandardSQLFunction( "countset" ) );
        registerFunction( "getbit", new StandardSQLFunction( "getbit" ) );
        registerFunction( "rotateleft", new StandardSQLFunction( "rotateleft" ) );
        registerFunction( "rotateright", new StandardSQLFunction( "rotateright" ) );
        registerFunction( "setbit", new StandardSQLFunction( "setbit" ) );
        registerFunction( "shiftleft", new StandardSQLFunction( "shiftleft" ) );
        registerFunction( "shiftright", new StandardSQLFunction( "shiftright" ) );

        // Impala Date and Time Functions
        registerFunction( "add_months", new StandardSQLFunction( "add_months", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "adddate", new StandardSQLFunction( "adddate", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "current_timestamp", new StandardSQLFunction( "current_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_add", new StandardSQLFunction( "date_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_part", new StandardSQLFunction( "date_part", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_sub", new StandardSQLFunction( "date_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_trunc", new StandardSQLFunction( "date_trunc", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "datediff", new StandardSQLFunction( "datediff", StandardBasicTypes.INTEGER ) );
        registerFunction( "day", new StandardSQLFunction( "day", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayname", new StandardSQLFunction( "dayname", StandardBasicTypes.STRING ) );
        registerFunction( "dayofweek", new StandardSQLFunction( "dayofweek", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayofmonth", new StandardSQLFunction( "dayofmonth", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayofyear", new StandardSQLFunction( "dayofyear", StandardBasicTypes.INTEGER ) );
        registerFunction( "days_add", new StandardSQLFunction( "days_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "days_sub", new StandardSQLFunction( "days_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "extract", new StandardSQLFunction( "extract", StandardBasicTypes.INTEGER ) );
        registerFunction( "from_timestamp", new StandardSQLFunction( "from_timestamp", StandardBasicTypes.STRING ) );
        registerFunction( "from_unixtime", new StandardSQLFunction( "from_unixtime", StandardBasicTypes.STRING ) );
        registerFunction( "from_utc_timestamp", new StandardSQLFunction( "from_utc_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "hour", new StandardSQLFunction( "hour", StandardBasicTypes.INTEGER ) );
        registerFunction( "hours_add", new StandardSQLFunction( "hours_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "hours_sub", new StandardSQLFunction( "hours_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "int_months_between", new StandardSQLFunction( "int_months_between", StandardBasicTypes.INTEGER ) );
        registerFunction( "microseconds_add", new StandardSQLFunction( "microseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "microseconds_sub", new StandardSQLFunction( "microseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "millisecond", new StandardSQLFunction( "millisecond", StandardBasicTypes.INTEGER ) );
        registerFunction( "milliseconds_add", new StandardSQLFunction( "milliseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "milliseconds_sub", new StandardSQLFunction( "milliseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "minute", new StandardSQLFunction( "minute", StandardBasicTypes.INTEGER ) );
        registerFunction( "minutes_add", new StandardSQLFunction( "minutes_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "minutes_sub", new StandardSQLFunction( "minutes_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "month", new StandardSQLFunction( "month", StandardBasicTypes.INTEGER ) );
        registerFunction( "monthname", new StandardSQLFunction( "monthname", StandardBasicTypes.STRING ) );
        registerFunction( "months_add", new StandardSQLFunction( "months_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "months_between", new StandardSQLFunction( "months_between", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "months_sub", new StandardSQLFunction( "months_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "nanoseconds_add", new StandardSQLFunction( "nanoseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "nanoseconds_sub", new StandardSQLFunction( "nanoseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "next_day", new StandardSQLFunction( "next_day", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "now", new StandardSQLFunction( "now", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "quarter", new StandardSQLFunction( "quarter", StandardBasicTypes.INTEGER ) );
        registerFunction( "second", new StandardSQLFunction( "second", StandardBasicTypes.INTEGER ) );
        registerFunction( "seconds_add", new StandardSQLFunction( "seconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "seconds_sub", new StandardSQLFunction( "seconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "subdate", new StandardSQLFunction( "subdate", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "timeofday", new StandardSQLFunction( "timeofday", StandardBasicTypes.STRING ) );
        registerFunction( "timestamp_cmp", new StandardSQLFunction( "timestamp_cmp", StandardBasicTypes.INTEGER ) );
        registerFunction( "to_date", new StandardSQLFunction( "to_date", StandardBasicTypes.STRING ) );
        registerFunction( "to_timestamp", new StandardSQLFunction( "to_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "to_utc_timestamp", new StandardSQLFunction( "to_utc_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "unix_timestamp", new StandardSQLFunction( "unix_timestamp", StandardBasicTypes.INTEGER ) );
        registerFunction( "utc_timestamp", new StandardSQLFunction( "utc_timestamp", StandardBasicTypes.INTEGER ) );
        registerFunction( "weekofyear", new StandardSQLFunction( "weekofyear", StandardBasicTypes.INTEGER ) );
        registerFunction( "weeks_add", new StandardSQLFunction( "weeks_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "weeks_sub", new StandardSQLFunction( "weeks_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "year", new StandardSQLFunction( "year", StandardBasicTypes.INTEGER ) );
        registerFunction( "years_add", new StandardSQLFunction( "years_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "years_sub", new StandardSQLFunction( "years_sub", StandardBasicTypes.TIMESTAMP ) );

        // Impala Conditional Functions
        registerFunction( "coalesce", new StandardSQLFunction( "coalesce" ) );
        registerFunction( "decode", new StandardSQLFunction( "decode" ) );
        registerFunction( "if", new StandardSQLFunction( "if" ) );
        registerFunction( "ifnull", new StandardSQLFunction( "ifnull" ) );
        registerFunction( "isfalse", new StandardSQLFunction( "isfalse", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "nullvalue", new StandardSQLFunction( "nullvalue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "nonnullvalue", new StandardSQLFunction( "nonnullvalue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "istrue", new StandardSQLFunction( "istrue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "isnotfalse", new StandardSQLFunction( "isnotfalse", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "isnottrue", new StandardSQLFunction( "isnottrue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "zeroifnull", new StandardSQLFunction( "zeroifnull" ) );
        registerFunction( "nvl2", new StandardSQLFunction( "nvl2" ) );
        registerFunction( "nvl", new StandardSQLFunction( "nvl" ) );
        registerFunction( "nullifzero", new StandardSQLFunction( "nullifzero" ) );
        registerFunction( "nullif", new StandardSQLFunction( "nullif" ) );
        registerFunction( "isnull", new StandardSQLFunction( "isnull" ) );

        // Impala String Functions
        registerFunction( "ascii", new StandardSQLFunction( "ascii", StandardBasicTypes.INTEGER ) );
        registerFunction( "base64encode", new StandardSQLFunction( "base64encode", StandardBasicTypes.STRING ) );
        registerFunction( "base64decode", new StandardSQLFunction( "base64decode", StandardBasicTypes.STRING ) );
        registerFunction( "left", new StandardSQLFunction( "left", StandardBasicTypes.STRING ) );
        registerFunction( "initcap", new StandardSQLFunction( "initcap", StandardBasicTypes.STRING ) );
        registerFunction( "group_concat", new StandardSQLFunction( "group_concat", StandardBasicTypes.STRING ) );
        registerFunction( "concat_ws", new StandardSQLFunction( "concat_ws", StandardBasicTypes.STRING ) );
        registerFunction( "concat", new StandardSQLFunction( "concat", StandardBasicTypes.STRING ) );
        registerFunction( "chr", new StandardSQLFunction( "chr", StandardBasicTypes.STRING ) );
        registerFunction( "btrim", new StandardSQLFunction( "btrim", StandardBasicTypes.STRING ) );
        registerFunction( "instr", new StandardSQLFunction( "instr", StandardBasicTypes.INTEGER ) );
        registerFunction( "find_in_set", new StandardSQLFunction( "find_in_set", StandardBasicTypes.INTEGER ) );
        registerFunction( "char_length", new StandardSQLFunction( "char_length", StandardBasicTypes.INTEGER ) );
        registerFunction( "levenshtein", new StandardSQLFunction( "levenshtein", StandardBasicTypes.INTEGER ) );
        registerFunction( "length", new StandardSQLFunction( "length", StandardBasicTypes.INTEGER ) );
        registerFunction( "locate", new StandardSQLFunction( "locate", StandardBasicTypes.INTEGER ) );
        registerFunction( "lcase", new StandardSQLFunction( "lcase", StandardBasicTypes.STRING ) );
        registerFunction( "lower", new StandardSQLFunction( "lower", StandardBasicTypes.STRING ) );
        registerFunction( "right", new StandardSQLFunction( "right", StandardBasicTypes.STRING ) );
        registerFunction( "reverse", new StandardSQLFunction( "reverse", StandardBasicTypes.STRING ) );
        registerFunction( "replace", new StandardSQLFunction( "replace", StandardBasicTypes.STRING ) );
        registerFunction( "repeat", new StandardSQLFunction( "repeat", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_replace", new StandardSQLFunction( "regexp_replace", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_extract", new StandardSQLFunction( "regexp_extract", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_escape", new StandardSQLFunction( "regexp_escape", StandardBasicTypes.STRING ) );
        registerFunction( "parse_url", new StandardSQLFunction( "parse_url", StandardBasicTypes.STRING ) );
        registerFunction( "ltrim", new StandardSQLFunction( "ltrim", StandardBasicTypes.STRING ) );
        registerFunction( "lpad", new StandardSQLFunction( "lpad", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_like", new StandardSQLFunction( "regexp_like", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "ucase", new StandardSQLFunction( "ucase", StandardBasicTypes.STRING ) );
        registerFunction( "upper", new StandardSQLFunction( "upper", StandardBasicTypes.STRING ) );
        registerFunction( "trim", new StandardSQLFunction( "trim", StandardBasicTypes.STRING ) );
        registerFunction( "translate", new StandardSQLFunction( "translate", StandardBasicTypes.STRING ) );
        registerFunction( "substring", new StandardSQLFunction( "substring", StandardBasicTypes.STRING ) );
        registerFunction( "substr", new StandardSQLFunction( "substr", StandardBasicTypes.STRING ) );
        registerFunction( "strright", new StandardSQLFunction( "strright", StandardBasicTypes.STRING ) );
        registerFunction( "strleft", new StandardSQLFunction( "strleft", StandardBasicTypes.STRING ) );
        registerFunction( "split_part", new StandardSQLFunction( "split_part", StandardBasicTypes.STRING ) );
        registerFunction( "space", new StandardSQLFunction( "space", StandardBasicTypes.STRING ) );
        registerFunction( "rtrim", new StandardSQLFunction( "rtrim", StandardBasicTypes.STRING ) );
        registerFunction( "rpad", new StandardSQLFunction( "rpad", StandardBasicTypes.STRING ) );

        // Impala Miscellaneous Functions
        registerFunction( "effective_user", new StandardSQLFunction( "effective_user", StandardBasicTypes.STRING ) );
        registerFunction( "current_database", new StandardSQLFunction( "current_database", StandardBasicTypes.STRING ) );
        registerFunction( "version", new StandardSQLFunction( "version", StandardBasicTypes.STRING ) );
        registerFunction( "uuid", new StandardSQLFunction( "uuid", StandardBasicTypes.STRING ) );
        registerFunction( "user", new StandardSQLFunction( "user", StandardBasicTypes.STRING ) );
        registerFunction( "sleep", new StandardSQLFunction( "sleep", StandardBasicTypes.STRING ) );
        registerFunction( "logged_in_user", new StandardSQLFunction( "logged_in_user", StandardBasicTypes.STRING ) );
        registerFunction( "pid", new StandardSQLFunction( "pid", StandardBasicTypes.INTEGER ) );

        // Impala Aggregate Functions
        registerFunction( "min", new StandardSQLFunction( "min" ) );
        registerFunction( "max", new StandardSQLFunction( "max" ) );
        registerFunction( "appx_median", new StandardSQLFunction( "appx_median" ) );
        registerFunction( "sum", new StandardSQLFunction( "sum", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev_samp", new StandardSQLFunction( "stddev_samp", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev_pop", new StandardSQLFunction( "stddev_pop", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev", new StandardSQLFunction( "stddev", StandardBasicTypes.DOUBLE ) );
        registerFunction( "ndv", new StandardSQLFunction( "ndv", StandardBasicTypes.DOUBLE ) );
        registerFunction( "avg", new StandardSQLFunction( "avg", StandardBasicTypes.DOUBLE ) );
        registerFunction( "count", new StandardSQLFunction( "count", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "group_concat", new StandardSQLFunction( "group_concat", StandardBasicTypes.STRING ) );

        getDefaultProperties().setProperty( Environment.STATEMENT_BATCH_SIZE, DEFAULT_BATCH_SIZE );

        limitHandler = new ImpalaLimitHandler();
    }

    @Override
    public String getAddColumnString() {
        return "add column";
    }

    @Override
    public boolean supportsLockTimeouts() {
        return false;
    }

    @Override
    public String getForUpdateString() {
        return "";
    }

    @Override
    public LimitHandler getLimitHandler() {
        return limitHandler;
    }

    @Override
    public boolean supportsLimit() {
        return true;
    }

    @Override
    public String getLimitString(String sql, boolean hasOffset) {
        return sql + (hasOffset ? " limit ? offset ? " : " limit ?");
    }

    @Override
    public boolean bindLimitParametersFirst() {
        return true;
    }

    @Override
    public boolean supportsIfExistsAfterTableName() {
        return false;
    }

    @Override
    public boolean supportsIfExistsBeforeTableName() {
        return true;
    }

    @Override
    public boolean supportsColumnCheck() {
        return true;
    }

    @Override
    public boolean supportsSequences() {
        return true;
    }

    @Override
    public boolean supportsPooledSequences() {
        return true;
    }

    @Override
    protected String getCreateSequenceString(String sequenceName) {
        return "create sequence " + sequenceName + " start with 1";
    }

    @Override
    protected String getCreateSequenceString(String sequenceName, int initialValue, int incrementSize) throws MappingException {
        if ( supportsPooledSequences() ) {
            return "create sequence " + sequenceName + " start with " + initialValue + " increment by " + incrementSize;
        }
        throw new MappingException( getClass().getName() + " does not support pooled sequences" );
    }

    @Override
    public SequenceInformationExtractor getSequenceInformationExtractor() {
        return SequenceInformationExtractorHSQLDBDatabaseImpl.INSTANCE;
    }

    @Override
    public String getSelectClauseNullString(int sqlType) {
        String literal;
        switch ( sqlType ) {
            case Types.BIGINT:
                literal = "cast(null as bigint)";
                break;
            case Types.BOOLEAN:
                literal = "cast(null as boolean)";
                break;
            case Types.CHAR:
                literal = "cast(null as string)";
                break;
            case Types.DECIMAL:
                literal = "cast(null as decimal)";
                break;
            case Types.DOUBLE:
                literal = "cast(null as double)";
                break;
            case Types.FLOAT:
                literal = "cast(null as float)";
                break;
            case Types.INTEGER:
                literal = "cast(null as int)";
                break;
            case Types.SMALLINT:
                literal = "cast(null as smallint)";
                break;
            case Types.VARCHAR:
                literal = "cast(null as string)";
                break;
            case Types.BLOB:
                literal = "cast(null as string)";
                break;
            case Types.CLOB:
                literal = "cast(null as string)";
                break;
            case Types.TINYINT:
                literal = "cast(null as tinyint)";
                break;
            case Types.TIMESTAMP:
                literal = "cast(null as timestamp)";
                break;
            default:
                literal = "cast(null as string)";
        }
        return literal;
    }

    @Override
    public boolean supportsUnionAll() {
        return true;
    }

    @Override
    public boolean supportsCurrentTimestampSelection() {
        return true;
    }

    @Override
    public boolean isCurrentTimestampSelectStringCallable() {
        return false;
    }

    @Override
    public String getCurrentTimestampSelectString() {
        return "select current_timestamp()";
    }

    @Override
    public String getCurrentTimestampSQLFunctionName() {
        return "current_timestamp";
    }

    @Override
    public boolean supportsCommentOn() {
        return true;
    }

    // Overridden informational metadata ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    @Override
    public boolean supportsEmptyInList() {
        return false;
    }

    @Override
    public boolean requiresCastingOfParametersInSelectClause() {
        return true;
    }

    @Override
    public boolean doesReadCommittedCauseWritersToBlockReaders() {
        return true;
    }

    @Override
    public boolean doesRepeatableReadCauseReadersToBlockWriters() {
        return true;
    }

    @Override
    public boolean supportsLobValueChangePropogation() {
        return false;
    }

    @Override
    public String toBooleanValueString(boolean bool) {
        return String.valueOf( bool );
    }

    @Override
    public NameQualifierSupport getNameQualifierSupport() {
        return NameQualifierSupport.SCHEMA;
    }

    @Override
    public boolean supportsNamedParameters(DatabaseMetaData databaseMetaData) throws SQLException {
        return false;
    }

    @Override
    public boolean dropConstraints() {
        return false;
    }

    @Override
    public String getCascadeConstraintsString() {
        return " CASCADE ";
    }
}

 

4. Resolver 등록

 

이제 Custom Database, Dialect, Resolver는 모두 만들었습니다.

그럼 이 클래스들이 hibernate가 사용할 수 있도록 등록해주면 됩니다.

등록하는 코드는 아래와 같습니다.

Properties properties = new Properties();
properties.setProperty("hibernate.dialect_resolvers", "CustomResolver 경로");
localContainerEntityManagerFactoryBean.setJpaProperties(properties);

 

4. 마무리

이번 포스팅에서는 조회 용도의 Impala-Hibernate 조합을 알아보았습니다.

 

많지는 않겠지만 혹시 사용이 필요한 사람들을 위해 코드를 공유합니다.

 

감사합니다.

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Mybatis  (0) 2021.02.24
Impala  (1) 2021.01.02
반응형

1. 서론

이번 포스팅에서는 앱단에서 mybatis 를 통해 Impala를 사용하는 과정에서 생긴 문제점을 공유하려 합니다.

 

2. 문제점

impala 는 hadoop에 있는 다양한 데이터( = hdfs, hbase, kudu, etc) 들을 SQL을 통해 간편하고 빠르게 조회 및 처리할 수 있도록 도와주는 인터페이스입니다.

 

mybatis는 SQL 매퍼로서 앱단에서 dynamic query를 가능하게 해주며,

xml을 이용하여 SQL과 java와 같은 프로그래밍언어를 분리하도록 도와주는 라이브러리입니다.

 

impala와 mybatis 를 같이 사용하면 간편한 코드작성으로 impala를 통해 hadoop 데이터를 핸들링 할 수 있습니다.

 

하지만, 최신 impala와 mybatis 버전에는 문제점이 하나 있습니다.

바로, java의 LocalDateTime 필드 처리입니다.

 

먼저, mybatis를 이용하여 impala 데이터를 처리하는 과정을 설명하겠습니다.

 

1) LocalDateTimeTypeHandler

 

mybatis는 vo class의 필드를 보고, LocalDateTime 필드의 경우 LocalDateTimeTypeHandler 를 호출하게 됩니다.

public class LocalDateTimeTypeHandler extends BaseTypeHandler<LocalDateTime> {

  @Override
  public void setNonNullParameter(PreparedStatement ps, int i, LocalDateTime parameter, JdbcType jdbcType)
          throws SQLException {
    ps.setObject(i, parameter);
  }

  @Override
  public LocalDateTime getNullableResult(ResultSet rs, String columnName) throws SQLException {
    return rs.getObject(columnName, LocalDateTime.class); // 문제 부분입니다.
  }

  @Override
  public LocalDateTime getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
    return rs.getObject(columnIndex, LocalDateTime.class);
  }

  @Override
  public LocalDateTime getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
    return cs.getObject(columnIndex, LocalDateTime.class);
  }
}

 

코드에서 보이는것과 같이 LocalDateTimeTypeHandler 에서는 ResultSet의 getObject를 사용합니다.

여기서 이 ResultSet은 jdbc 라이브러리에 있는 인터페이스이며, impala는 cloudera 측에서 제공하는 impalaJdbc 라이브러리가 있습니다.

 

2) S41ForwardResultSet

 

cloudera에서 제공하는 jdbc 라이브러리의 ResultSet 구현 클래스입니다.

클래스에는 getObject 메서드가 아래와 같이 구현 되어있고, Util 클래스인 ResultSetUtilities 의 getObjectByType 메서드를 호출하는것을 볼 수 있습니다.

 

public <T> T getObject(int var1, Class<T> var2) throws SQLException {
    try {
        LogUtilities.logFunctionEntrance(this.m_logger, new Object[]{var1, var2});
        this.checkIfOpen();
        return ResultSetUtilities.getObjectByType(this, var1, var2);
    } catch (Exception var4) {
        throw ExceptionConverter.getInstance().toSQLException(var4, this.getWarningListener(), this.getLogger());
    }
}

 

3) ResultSetUtilities

 

아래는 getObjectByType 의 일부 코드입니다.

일부 코드만 봐도 알듯이, mybatis에서 전달한 Class타입을 가지고 알맞게 캐스팅하여 반환하는 로직입니다.

 

하지만, 이 if-else 문에는 아쉽게도 LocalDateTime.class 로 비교하는 구문이 없어 에러가 나게 됩니다.

if (var2.equals(String.class)) {
    return var0.getString(var1);
} else if (var2.equals(Time.class)) {
    return var0.getTime(var1);
} else if (var2.equals(Timestamp.class)) {
    return var0.getTimestamp(var1);

 

반응형

 

3. 해결법

1) cloudera 측 문의

 

소스 한줄추가하면 가능한거라 메일로 문의를 드렸습니다.

하지만, 외국 기업인것도 있으며 오픈소스가 아니기에 제공하는 jdbc에 대한 클라이언트 요청을 전문적으로 받아 픽스하려면

유료 사용을 검토해야만 했습니다.

 

하지만..... 단순히 한줄이면 되었기에 이대로 포기할 수 없었습니다.

 jdbc 라이브러리가 안되면 mybatis handler를 수정하기로 마음먹었습니다.

 

2) mybatis custom handler 등록

 

다행히도, mybatis에는 custom handler를 등록할 수 있도록 제공하고 있었습니다.

 

소스는 아래와 같이, sqlSesqlSessionFactoryBean 에 커스텀 핸들러를 등록하는겁니다.

sqlSessionFactoryBean.setTypeHandlersPackage("커스텀 핸들러 패키지 경로");

 

핸들러는 아래와 같이 등록하였습니다.

 

package 커스텀 핸들러 패키지 경로;

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class LocalDateTimeTypeHandler extends BaseTypeHandler<LocalDateTime> {

    @Override
    public void setNonNullParameter(PreparedStatement ps, int i, LocalDateTime parameter, JdbcType jdbcType)
            throws SQLException {
        ps.setTimestamp(i, Timestamp.valueOf(parameter));
    }

    @Override
    public LocalDateTime getNullableResult(ResultSet rs, String columnName) throws SQLException {
        Timestamp timestamp = rs.getTimestamp(columnName);
        return getLocalDateTime(timestamp);
    }

    @Override
    public LocalDateTime getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
        Timestamp timestamp = rs.getTimestamp(columnIndex);
        return getLocalDateTime(timestamp);
    }

    @Override
    public LocalDateTime getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
        Timestamp timestamp = cs.getTimestamp(columnIndex);
        return getLocalDateTime(timestamp);
    }

    private static LocalDateTime getLocalDateTime(Timestamp timestamp) {
        if (timestamp != null) {
            return timestamp.toLocalDateTime();
        }
        return null;
    }
}

 

사실, 이 핸들러 코드는 mybatis 마이너버전의 소스입니다.

하지만, LocalDateTime 하나때문에 마이너한 mybatis 버전을 메이저로 올리지 못했기에 mybatis 버전은 메이저로, 문제가 되는 handler는 커스텀하여 해결하였습니다.

 

4. 마무리

apache impala 에서는 timestamp 컬럼을  java 언어에서 사용할 시 LocalDateTime을 사용하라고 권장하고 있습니다.

다만, jdbc에서는 아직 지원이 되지 않아 겪은 문제입니다.

 

하루빨리 jdbc 에도 LocalDateTime 지원이 되어 편하게 개발하는 날이 오길 기대하고 있습니다.

 

감사합니다.

 

 

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Hibernate  (1) 2021.02.24
Impala  (1) 2021.01.02
반응형

1. 서론

이번 포스팅에서는 Apache Impala에 대해 소개하려고 합니다.

 

Cloudera 도큐먼트를 보며 개인적으로 6.x 버전부터 Impala 와 Kudu 조합을 추천하고 있다고 느꼈습니다.
때문에, Hadoop에 관심이 있으신 분이라면 Impala와 Kudu에 대해 알아보는 것을 추천합니다.

물론, Impala는 Kudu가 아니더라도 File에 대해서도 SQL을 수행할 수 있는 서비스입니다.

 

2. Impala란?

임팔라는 Hadoop Eco에 저장되어 있는 데이터를 실시간 병렬 조회가 가능한 SQL 쿼리 엔진입니다.

 

Impala는 아래와 같이 병렬성을 가질수 있는 아키텍처로 이루어져 있습니다.

 

 

 

1) Impalad

 

Impalad는 impala daemon으로 실제 쿼리를 수행하는 역할을 담당합니다.

 

Impalad는 그림과 같이 Query Compiler, Query Coordinator, Query Executor로 이루어져 있습니다.

 

Compiler는 말그대로 client에서 요청받은 SQL 질의를 Compile 한 후 어떻게 처리할지 결정합니다.

Coordinator는 Compiler가 전달한 요청서를 받아 각각 다른 호스트에 있는 Impalad의 Executor에 요청합니다.

이는 데이터의 locality를 보장하며 이로인해 분산 질의 처리가 가능해지며 성능이 향상됩니다.

 

마지막으로 Executor는 실제로 자신이 담당하는 데이터에 대해서 SQL 질의를 수행하는 역할을 합니다.

질의 수행이 끝난 Executor는 Coordinator에게 결과를 반환합니다.

 

 

여기서, 눈치 채셨겠지만 각 Impalad는 metadata를 가지고 있습니다.

metadata는 필요한 데이터가 어느 호스트(노드)에 있는지에 대한 mapping 정보가 담겨져 있습니다.

 

Impalad가 metadata를 가지고 있기 때문에, Client의 요청은 모든 Impalad가 수신 할 수 있습니다.

 

 

2) StateStore

 

StateStore는 각 Impalad의 상태를 관리하며 데이터 일관성을 위해 Metadata의 변경 사항을 모든 Impalad에 브로드캐스팅하는 역할을 담당합니다.

 

StateStore는 Impala 서비스에서 없다고 동작이 안하진 않습니다.

다만, 클러스터 내에서 Impalad가 제외된 경우 쿼리 가능한 daemon 그룹에서 제외를 못하며, metadata 갱신이 안되는 경우가 발생할 수 있습니다.

 

때문에, 사실상 운영시 StateStore도 띄어야 합니다.

 

3) Catalog Service

 

Catalog Service는 Impala 쿼리로 데이터의 CUD 혹은 DDL 작업 시,

이를 impalad의 metadata에 반영하기 위해 StateStore에 브로드캐스팅해달라고 요청하는 역할을 담당합니다.

브로드캐스팅은 StateStore를 통해서 수행되므로 Catalog Service와 StateStore는 같은 호스트에서 수행하는 것이 좀 더 효율적입니다.

그림에서와 같이 Catalog Service도 metadata가 있으며, 이 metadata가 원본이라고 생각하시면 됩니다.

 

3. Impala 사용시 주의점

Impala는 HMS( Hive MetaStore ) 가 떠있어야 가능합니다.

 

그 이유는 Catalog Service가 가지고 있는 Metadate 관리는 실제로 HMS에서 관리하는 데이터를 복사한것이기 때문입니다.

 

때문에, Hive와 Impala를 같이 사용시 주의할 점이 생기게 됩니다.

 

주의할 점은 HiveQL로 인한 metadata 변경은 impala에서 알 수 없다는 점입니다.

HiveQL로 변경된 데이터는 HMS에는 반영되나 impala에서 원본 metadata를 관리하는 Catalog Service에는 반영이 안되기 때문입니다.

 

하지만, 위에서 말씀드린것처럼 Catalog Service의 metadata 은 HMS에서 Copy한 것입니다.

그러니, HiveQL로 변경된 부분을 Impala metadata에도 반영하고 싶은 경우에는 HMS에서 Copy를 다시 하면 됩니다.

 

다시 변경 부분만을 Copy 해와 Impalad에 브로드캐스팅할 수 있도록 Impala는 REFRESH, INVALIDATE METADATA 쿼리를 제공합니다.

 

그럼, Hive에서도 Impala 쿼리로 인한 변경사항을 반영해야하는 문제가 있을까요?

결국 Metadata의 원본은 HMS이며, Impala에서 변경된 부분은 HMS에도 반영됩니다.

또한, Hive는 매 질의시 HMS에서 Metadata를 조회해 처리하기 때문에 Hive는 REFRESH,INVALIDATE METADATA 와 같은 작업을 하지 않아도 됩니다.

 

반응형

 

4. Hive 와 Impala 차이점

Hive와 Impala의 차이점은 동작의 차이로 인해 발생합니다.

Hive는 SQL을 통해 편하게 MR을 수행하는 서비스입니다.

Impala는 MR로 동작하지 않습니다. 위에서 소개한것과 같이 Impala는 MR이 아닌 SQL 엔진을 통해 동작합니다.

 

이로인해, Impala는 Hive에 비해 처리속도가 빠릅니다.

그렇다고, Hive를 모두 Impala 로 대체하는것은 옳바르진 않습니다.

 

우선, Hive, Impala 모두 OLAP 적합한 서비스입니다. 다만, Impala는 SQL 처리가 메모리상에서 이루어집니다.

때문에, 호스트에 메모리가 부족하거나 메모리를 너무 과도하게 차지하는 쿼리의 경우에는

Hive를 사용하는것이 클러스터에 부담도 주지않으며 좋은 선택일 수 있습니다.

물론, Hive는 MR이기에 Spark로 대체하는 방법도 좋은 방안입니다.
하지만, Spark도 기본은 메모리 연산이기에 각 요구사항에 맞게 선택하여 사용해야 합니다.

 

또한, Hive, Impala 는 OLAP에 적합한 서비스라고 했습니다. 하지만 이는 Hdfs 파일에 대해서 쿼리를 날리는 경우입니다.

 

Impala 를 SQL 인터페이스로 사용하지만 실제 데이터는 Kudu 혹은 Hbase에 있다면, 쿼리는 Kudu, Hbase 특성에 맞게 작성해야합니다.

예를들어, Kudu의 경우에는 PK에 대해서 B+Tree로 index를 가지고 있기 때문에 Where 절에 PK 넣는 쿼리의 경우에는 데이터가 대용량이 아니더라도 응답속도가 빠르게 나오게 됩니다.

impalad 가 Kudu Master 혹은 Hbase Master에 요청하여 데이터를 가져오기 때문입니다.
이런 경우에는 metadata도 사실상 무의미하며 해당 데이터를 담당하는 Kudu, Hbase 영역으로 넘어가게 됩니다.

하지만, 이런 저장소에서 제공하는 별도의 클라이언트를 사용하지 않고 Impala 를 통해 간편하게 SQL로 데이터를 Handling할 수 있다는 점이 Impala의 장점입니다.

 

5. HAProxy

Client의 요청은 모든 Impalad가 수신 할 수 있다고 했습니다.

때문에, 관리자 입장에서는 하나의 Impalad에만 요청이 쏠리지 않도록 HAProxy를 구축해야 합니다.

 

HAProxy를 구축하면 얻는 이점은 아래와 같습니다.

 

  1. 부하 분산
  2. Client의 요청을 단일 endpoint 로 관리 가능
    1. Impalad가 추가된다면 HAProxy에 추가된 Impalad 호스트만 추가하면 됩니다.
Cloudera Impala Document 에서는 무료 오픈소스인 haproxy 를 소개하고 있습니다.

 

6. 마무리

이번 포스팅에서는 Impala에 대해 알아봤습니다.

 

부족한 설명이였지만 읽어주셔서 감사합니다.

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Hibernate  (1) 2021.02.24
Impala - Mybatis  (0) 2021.02.24
반응형

1. 서론

이번 포스팅에서는 Apache Hadoop Eco 저장소 중 하나인 Kudu 에 대해 소개하려고 합니다.

개인적으로 최근 Kudu 를 사용하는 회사들이 늘어나는거 같습니다.
아무래도, cloudera 측에서 impala와의 연계 저장소로 추천하고 있다는 점이 크다고 봅니다.

 

소개 목차로는 아래와 같습니다.

 

  • Kudu란?
  • Hbase와의 차이점
  • Kudu 지원 & 미지원

2. Kudu란?

Kudu는 Hadoop Eco 저장소 중 하나이며, Columnar Storage 입니다.

Columnar Storage인 이유는 Mongo, Hbase와 같이 schemaless 여서가 아니며, 물리적으로 Column 별로 파일에 저장하기 때문입니다.

 

Kudu 는 분산 플랫폼으로 아래와 같은 아키텍처를 가지고 있습니다.

 

 

1) Tablet

 

Kudu에서 테이블은 파티셔닝 테이블입니다.

파티셔닝 테이블이란 특정 column을 기준으로 데이터를 나눠 저장한다는 의미입니다.

 

바로 이 파티셔닝 테이블에서 하나의 파티셔닝이 Tablet 입니다.

데이터를 샤딩하는것과 같으며, 하나의 샤딩이라고 보시면 됩니다.

 

이 Tablet은 Leader, Follower로 이루어져 있습니다.

모든 Write 요청은 Leader가 받으며, Read 요청은 Leader, Follower 모두 받습니다.

 

Leader, Follower에서 Read가 가능하게 하기 위해 Write의 동작방식은 아래와 같이 이루어지게 됩니다.

 

 

 

 

데이터 유실을 방지하기 위해 가장 먼저 Leader의 WAL에 쓰기 작업을 하며,

그 이후는 각 Follwer에게 쓰기를 요청하여 모두 성공한 경우 클라이언트에게 성공 응답을 보내게 됩니다.

개인적으로 이런 부분은 분산 플랫폼 중 Kafka와 유사하다고 볼 수 있습니다.

 

2) Tablet Server

 

Tablet Server는 Tablet 을 가지고 있는 서버를 말하며, 여러개의 Tablet 을 가지고 있습니다.

 

Tablet Server는 크게 Master 서버와 Tablet 서버로 나누어지며,

각 역할은 Hdfs의 Name 노드와 Data 노드와 같이 Tablet의 메타데이터는 Master Server, 실제 데이터는 Tablet Server에 있습니다.

Master 서버의 메타데이터 또한 Leader, Follwer 구조로 이루어져 있습니다.

 

3) MRS

 

MRS는 MemRowSet으로 WAL에 데이터 Write 후 써지는 저장소입니다.

메모리로 이루어져 있으며 B+ tree로 구성되어져 있습니다.

 

아래는 MRS 그림입니다.

 

 

 

4) DRS

 

MRS가 일정 시간 혹은 크기가 넘어가게 되면 Disk에 Write를 하게됩니다.

바로 Write하는 곳이 DiskRowSet이며 DRS 라고 합니다.

 

5) DeltaMemeStore

 

DeltaMemStore는 Update 요청으로 데이터 변경분을 주기적으로 Redo 파일에 Write하는 메모리 영역입니다.

 

Kudu는 MRS, DRS, DeltaMemStore와 같이 중간에 메모리와 디스크를 통해 영구적인 데이터 보존과 함께, 속도까지 겸비한것을 알 수 있습니다.

 

하지만, 조회를 하는 클라이언트 입장에서는 MRS, DRS, RedoFile 등등 조회할 부분이 많습니다.

이를 위해, Kudu는 compaction 작업을 통해 조회시 접근할 메모리 영역과 디스크 영역을 최소화합니다.

 

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3. Hbase와의 차이점

 

Kudu와 Hbase는 비슷한 점이 많습니다.

비슷한 부분은 Tablet <-> Region, Tablet Server <-> Region Server, Columnar Storage 등이 있습니다.

 

하지만 apache 깃헙을 가보시면 Hbase, Kudu는 별도의 프로젝트이며, 엄연히 사용처가 다른 것을 짐작할 수 있습니다.

 

Hbase와 Kudu의 가장 큰 차이점은 자료구조입니다.

Hbase는 LSM인 Log Structured Merge 방식을 사용하며, Kudu는 B+tree 방식의 MRS, DRS를 사용합니다.

 

이로인해, key를 통해 read하는 경우 Kudu가 Hbase에 비해 속도가 우수합니다.

물론, Hbase의 경우에도 Read 성능을 올리기위해 저장파일인 HFile이 Multi-layerd index 방식으로 이루어져 있습니다. 

 

두번째 차이점으로는 저장 파일단위입니다.

Hbase는 HFile, Kudu는 CFile로 데이터를 저장합니다.

 

이로인해, table의 컬럼별 집계와 같은 aggregation 쿼리의 경우 Kudu가 File IO가 적게 들어 우수한 성능이 나옵니다.

 

이러한 차이점으로 인해 Kudu가 우수한 부분도 있지만 Hbase가 우수한 부분도 있습니다.

 

첫째 Write 성능

 

Hbase는 MemStore, LSM 의 방식으로 인해 Write의 경우 Kudu보다 성능이 좀 더 우수합니다.

 

두번째 Scan 성능

 

Hbase의 경우 내부적으로 SortedMap 과 같이 정렬하여 데이터를 저장하고 있습니다.

이는, Scan 연산시 kudu에 비해 유리하게 작동할 수 있습니다.

 

세번째 컬럼별 버저닝

 

Hbase는 컬럼별 버저닝을 지원하고 있습니다. 이는 kudu에 비해 우수한 부분이기 보단 차이점으로,

이력 데이터를 관리하기에 용이한 저장소입니다.

 

4. Kudu 지원 & 미지원

kudu는 현재 지원되는 부분과 지원되지 않는 부분이 있습니다.

 

지원되는 부분으로는 아래와 같습니다.

 

  1. compound primary key
  2. single row transaction

미지원 항목은 아래와 같습니다.

 

  1. secondary indexes
  2. multi row transaction
  3. TTL(Time-to-Live)

 

5. 마무리

이번 포스팅에서는 Kudu에 대해 알아봤습니다.

 

Kudu가 Hbase의 대체제는 아니며

서비스 특성에 따라 Kudu 가 잘 맞을수도, Hbase가 잘 맞을수도 있습니다.

 

좋은 서비스를 위해서는 각 저장소의 특징을 알고, 적절하게 선택하여 사용해야 합니다.

 

감사합니다.

반응형
반응형

1. 서론

이번 포스팅에서는 Hbase가 제공하는 클라이언트 API : 관리 기능에 대해 알아보겠습니다.

 

Hbase는 데이터를 조작하는 API 뿐 만이 아닌 데이터 정의를 위한 클라이언트 API도 제공하고 있습니다.

 

2. 스키마 정의

Hbase에서는 테이블 생성 시 테이블 및 컬럼패밀리의 스키마를 정의해야 합니다.

 

1) 테이블

 

Hbase는 데이터를 저장하는 가장 상위의 개념으로 테이블을 사용합니다.

이런, 테이블의 특성을 클라이언트에서는 아래와 같은 지시자 클래스로 제공합니다.

 

TableDescriptorBuilder.newBuilder(TableName tableName).build();
TableDescriptorBuilder.newBuilder(TableDescriptor tableDescriptor).build();

TableName tableName = TableName.valueOf("tableName");
TableDescriptor tableDescriptor = new ModifyableTableDescriptor(tableName);

 

Hbase 의 테이블 명은 hdfs의 실제 디렉터리로 생성이되기 때문에 파일명 규칙에 따라 만들어야 합니다.

 

Hbase의 테이블은 논리적인 개념이며, 물리적으로는 여러개의 리전으로 분리가 되며 각 리전은 Hbase Region Server에 존재하게 됩니다.

 

아래는 위 설명을 간략하게 나타낸 그림입니다.

 

 

2) 테이블 속성

 

테이블 지시자 클래스에는 속성 설정을 위한 Getter, Setter를 제공합니다.

 

이름

 

아래와 같이 테이블 이름을 가져올 수 있습니다.

 

byte[] tableName1 = tableDescriptor.getTableName().getName();
String tableName2 = tableDescriptor.getTableName().getNameAsString();

 

수정의 경우 마이너 버전에서는 setName으로 가능했으나 현재는 테이블을 새로 생성하여 migration하는 방법을 권장하고 있습니다.

 

컬럼패밀리

 

테이블 정의 시 가장 중요한 부분은 컬럼패밀리입니다.

 

컬럼패밀리의 경우 아래와 같은 API를 통해 조작이 가능합니다.

 

tableDescriptor.getColumnFamilies();
tableDescriptor.getColumnFamily(byte[] column);
tableDescriptor.hasColumnFamily(byte[] column);
Admin admin = connection.getAdmin();
admin.addColumnFamily();
admin.deleteColumnFamily();

 

파일 최대 크기

 

테이블내의 리전이 커질 수 있는 최대 파일 크기 또한 조작이 가능합니다.

 

API 명세는 아래와 같습니다.

 

tableDescriptor.getMaxFileSize();
((ModifyableTableDescriptor) tableDescriptor).setMaxFileSize(1024000);

 

최대 크기 설정은 리전 크기가 해당 값에 도달했을때 시스템이 리전을 분할하는 기준이 됩니다.

 

 

읽기 전용

 

쓰기가 아닌 일기 전용 테이블을 만들어야 하는 경우 아래와 같은 API를 사용하면 됩니다.

 

tableDescriptor.isReadOnly();
((ModifyableTableDescriptor) tableDescriptor).setReadOnly(true);

 

 

 

멤스토어 플러시 크기

 

아래는 멤스토어에 있는 데이터를 HFile로 write하는 트리거링을 바이트 단위로 조작하고 싶은 경우 사용하는 API 입니다.

 

tableDescriptor.getMemStoreFlushSize();
((ModifyableTableDescriptor) tableDescriptor).setMemStoreFlushSize(102400);

 

 

3) 컬럼패밀리

 

TableDescriptor 와 같이 컬럼패밀리에 대한 설정을 담고 있는 클래스는 ColumnFamilyDescriptor 입니다.

 

컬럼패밀리 또한 저장소 계층의 디렉터리 이름으로 사용되기 때문에 파일명 규칙을 따라 생성해야 합니다.

 

아래는 컬럼패밀리 지시자 클래스 정의입니다.

 

ColumnFamilyDescriptor columnFamilyDescriptor1 = 
	ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("")).build();
ColumnFamilyDescriptor columnFamilyDescriptor2 = 
	ColumnFamilyDescriptorBuilder.newBuilder(new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes(""))).build();

 

이름

 

아래와 같이 이름을 읽을 수 있습니다.

 

byte[] name1 = columnFamilyDescriptor.getName();
String name2 = columnFamilyDescriptor.getNameAsString();

 

Setter를 통해 이름을 설정할 수 는 없습니다.

 

최대 버전 갯수

 

컬럼패밀리 별로 값의 버전을 몇 개까지 보유할 지 지정할 수 있습니다.

 

columnFamilyDescriptor.getMaxVersions();
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setMaxVersions(100);

 

압축

 

컬럼패밀리에 저장된 데이터에 특정 압축기법을 적용할 수 있습니다.

 

Algorithm algorithm1 = columnFamilyDescriptor.getCompactionCompressionType();
Algorithm algorithm2 = columnFamilyDescriptor.getCompressionType();
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setCompactionCompressionType(Algorithm.GZ);
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setCompressionType(Algorithm.GZ);

 

Algorithm 은 Hbase 클라이언트에서 제공하는 enum으로 아래 값들이 있습니다.

 

  • LZO
  • GZ
  • NONE
  • SNAPPY
  • LZ4
  • BZIP2
  • ZSTD
기본은 NONE으로 압축하지 않습니다.

 

블록크기

 

Hbase는 컬럼패밀리별로 HFile을 저장하게 됩니다.

그렇기 때문에, HFile의 블록 크기를 컬럼패밀리 지시자를 통해 조작이 가능합니다.

 

columnFamilyDescriptor.getBlocksize();
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setBlocksize(1024);

 

기본 HFile의 블록크기는 64KB로 기본 HDFS의 블록 크기인 64MB에 비교하여 1/1024 의 크기입니다.

 

블록 캐시

 

Hbase는 I/O 자원을 효율적으로 사용하기 위해 scan 연산 시 모든 블록을 읽어 메모리에 상주시켜놓고 재사용하게 합니다.

메모리에 올렸기 때문에 디스크에는 다시 접근하지 않아 I/O를 절약합니다.

 

columnFamilyDescriptor.isBlockCacheEnabled();
((ModifyableColumnFamilyDescriptor) columnFamilyDescriptor).setBlockCacheEnabled(false);

 

유효기간

 

Hbase 에서는 컬럼패밀리에 속한 데이터의 버전 뿐만이 아닌 유효기간도 설정도 가능합니다.

 

기본은 영원히 저장하도록 되어 있으며, 유효기간을 지정하게 되면 주 컴팩션시  유효기간이 지난 데이터들을 찾아 삭제합니다.

 

columnFamilyDescriptor.getTimeToLive();
((ModifyableColumnFamilyDescriptor) columnFamilyDescriptor).setTimeToLive(1000);

 

인메모리

 

위 블록캐시에서 말한 메모리에 대한 설정도 가능합니다.

 

columnFamilyDescriptor.isInMemory();
((ModifyableColumnFamilyDescriptor) columnFamilyDescriptor).setInMemory(true);

 

 

반응형

 

 

3. HBaseAdmin

Hbase에는 RDBMS의 DDL과 흡사한 기능을 제공하는 HBaseAdmin 클래스를 제공합니다.

 

HBaseAdmin 인스턴스는 아래와 같이 가져올 수 있습니다.

 

Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "");

Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin hBaseAdmin = (HBaseAdmin) connection.getAdmin();

 

1) 기본 기능

 

HBaseAdmin은 아래와 같은 기본 기능을 제공합니다.

 

1. getConnection()

 

연결 인스턴스를 반환하는 메소드입니다.

 

2. getConfiguration()

 

HBaseAdmin 인스턴스를 생성할 때 사용된 설정 인스턴스를 반환합니다.

 

3. close()

 

HBaseAdmin 인스턴스가 점유하고 있는 모든 자원을 해제합니다.

 

 

2) 테이블 관련 기능

 

HBaseAdmin은 DDL과 같이 테이블 생성을 위한 메서드를 제공합니다.

 

1. 생성

 

void createTable(TableDescriptor desc) throws IOException;
void createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
void createTable(final TableDescriptor desc, byte [][] splitKeys);

 

메서드 2번째를 보시면 테이블 생성 시 row key의 시작키와 끝키를 지정하여 특정 리전에 존재하는 테이블을 생성시킬 수 있습니다.

이때, 시작키는 끝키보다 작아야하며, 기본적으로 Hbase 의 리전은 시작키를 포함하고 끝키는 포함하지 않습니다.

numResions는 테이블이 확보해야하는 최소 리전 수를 의미하며
최소 3 이상이어야 합니다. 

 

아래는 테이블 생성하는 간단한 예제입니다.

 

Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
HBaseAdmin hBaseAdmin = (HBaseAdmin) connection.getAdmin();
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf("testTable")).build();
hBaseAdmin.createTable(desc);

 

 

2. 조회

 

테이블이 존재하는지, 혹은 지정한 테이블의 스키마 정의를 아래와 같은 메서드를 통해 가져올 수 있습니다.

 

boolean tableExists(final TableName tableName) throws IOException;
HTableDescriptor[] listTables() throws IOException;
HTableDescriptor[] listTables(Pattern pattern) throws IOException;
HTableDescriptor[] listTables(String regex) throws IOException;
HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException;

 

3. 삭제

 

테이블 삭제는 아래 메서드를 사용하시면 됩니다.

 

void deleteTable(final TableName tableName) throws IOException;

 

4. 비활성

 

HBase에서는 테이블 삭제 전 필수로 비활성을 시켜야 합니다.

 

비활성 메서드는 아래와 같습니다.

 

void disableTable(final TableName tableName) throws IOException;

 

비활성화 메서드는 아래와 같은 동작을 HBase에게 수행하도록 합니다.

 

  1. 모든 리전 서버에서 아직 적용하지 않은 변경사항 처리
  2. 모든 리전 닫음
  3. 해당 테이블의 모든 리전이 어떤 서버에도 배치되어 있지 않다는 정보를 메타( .META. ) 테이블에 기록

 

5. 활성화

 

HBaseAdmin은 테이블의 활성화 메서드도 제공하고 있습니다.

 

void enableTable(final TableName tableName) throws IOException;

 

6. 확인

 

지정 테이블이 활성화되어 있는지, 비활성화 되어 있는지 등의 상태를 확인하는 메서드는 아래와 같습니다.

 

boolean isTableEnabled(final TableName tableName) throws IOException;
boolean isTableDisabled(TableName tableName) throws IOException;
boolean isTableAvailable(TableName tableName) throws IOException;

 

isTableAvailable 메서드의 경우에는 테이블의 활성화 상태가 아닌 단지 존재하는지를 체크하는 메서드입니다.

 

7. 변경

 

테이블에 대한 정의를 변경도 가능합니다.

 

void modifyTable(TableDescriptor td) throws IOException;

 

단, 변경도 삭제와 동일하게 먼저 테이블을 비활성화 시킨 후 수행하여야 합니다.

 

 

3) 스키마 관련 기능

 

HBaseAdmin은 테이블의 컬럼 패밀리에 대한 정의도 변경 가능합니다.

 

void addColumnFamily(final TableName tableName, final ColumnFamilyDescriptor columnFamily) throws IOException;
void deleteColumnFamily(final TableName tableName, final byte[] columnFamily) throws IOException;
void modifyColumnFamily(final TableName tableName, final ColumnFamilyDescriptor columnFamily) throws IOException;

 

4) 클러스터 관련 기능

 

HBaseAdmin 클래스는 클러스터에 관련된 정보를 확인 및 조작할 수 있는 메서드를 제공합니다.

 

1. void flush(final TableName tableName) throws IOException

 

멤스토어에 있는 정보들을 강제로 disk write 하도록 수행하는 메서드입니다.

 

2. void compact(final TableName tableName) throws IOException

 

지정한 테이블을 컴팩션 대기열에 넣는 메서드입니다.

 

3. void majorCompact(final TableName tableName) throws IOException

 

지정 테이블에 대해 주 컴팩션이 일어나도록 트리거링하는 메서드입니다.

 

4. void split(final TableName tableName) throws IOException

 

지정 테이블에 대해 분할 작업을 수행시키는 메서드입니다.

지정 테이블의 모든 리전을 이터레이트하여 분할 작업을 자동적으로 호출합니다.

 

5. void assign(final byte [] regionName) throws

MasterNotRunningException, ZooKeeperConnectionException, IOException

 

리전을 할당할 때 사용하는 메서드입니다.

 

6. void unassign(final byte [] regionName, final boolean force) throws IOException

 

assign과 반대로 리전을 해제할 때 사용하는 메서드입니다.

 

7. void move(final byte[] encodedRegionName, ServerName destServerName) throws IOException

 

지정 리전을 특정 서버로 이동시키는 메서드입니다.

 

즉, 클라이언트는 능동적으로 특정 리전을 현재 배치된 서버가 아닌 다른 서버로 옮길 수 있습니다.

 

8. boolean balancerSwitch(final boolean on, final boolean synchronous) throws IOException

 

리전 밸런서를 키거나 끄는 메서드 입니다.

리번 밸런서랑 해당 리전이 지정 크기보다 커지는 경우 분할 시켜 다른 리전 서버로 보내는 역할을 합니다.

 

9. synchronized void shutdown() throws IOException

 

클러스터를 중단할 때 사용합니다.

 

10. synchronized void stopMaster() throws IOException

 

마스터 서버를 중단할 때 사용합니다.

 

11. synchronized void stopRegionServer(final String hostnamePort) throws IOException

 

특정 리전 서버만을 중단 할 때 사용합니다.

 

4. 마무리

이번 포스팅에서는 클라이언트 API : 관리 기능에 대해 진행하였습니다.

 

다음 포스팅에서는 챕터 6장인 클라이언트 종류에 대해 진행하겠습니다.

 

반응형

'BigData > Hbase' 카테고리의 다른 글

(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(2) 설치  (0) 2020.04.08
(1) 소개  (0) 2020.04.07
반응형

1. 서론

이번 포스팅에서는 앞 포스트에서 설치한 CDH와 개별 어플리케이션과의 연동 방법에 대해 알아 보도록하겠습니다.

 

2. 스파크 클라이언트

MR, Spark 모두 Job을 CDH 서버에서 제출하여 수행해도 됩니다.

 

하지만 일반적으로, CDH 서버군들은 YARN, HDFS, SPARK 등 처리와 관리를 위한 용도로만 프로세스를 띄우게 하고

별도 클라이언트 어플리케이션을 수행하는 서버군이 있습니다.

 

그리고, 클라이언트는 Job을 YARN에게 제출하여 CDH에서 작업을 처리하도록 한 후 결과를 받습니다.

 

아래는 간단한 클라이언트와 CDH 간의 그림입니다.

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3. CDH와 연동

 

클라이언트 서버에서 원하는 서비스들을 별도로 설치하여 사용할 수 있습니다.

 

하지만, 클라이언트에 CDH의 각 서비스 설정을 옮겨야 하는 번거로움이 있으며 실수할 수 있는 포인트가 생깁니다.

또한, 클라이언트와 CDH의 서비스 버전이 상이한것보다는 동일하게 운영 시 위험성이 낮습니다.

여기서 서비스라고 하는 것들은 Hdfs, Yarn, Spark 등등을 의미합니다.

 

저는 위와 같은 이유로 CDH 에서 한 서버를 선택하여 rsync를 통해 설정 정보를 모두 가져오도록 했습니다.

 

방법은 아래와 같습니다.

 

1) 터널링

 

먼저 CDH 중 서버 한개와 클라이언트 서버와 터널링을 진행합니다.

 

아래와 같이 ssh-keygen을 통해 클라이언트 서버에 public key를 생성합니다.

 

ssh-keygen
cd ~/.ssh
cat id_rsa.pub

 

 

cat으로 출력 된  public key를 CDH 서버 한대의 .ssh 디렉터리로 이동하여 authorized_keys 파일에 추가해줍니다.

 

이제 수동으로 클라이언트 서버에서 CDH 서버로 ssh 연결을 한번 수행 해줍니다.

 

ssh CDH-10-server

 

 

2) rync

 

CDH의 경우 /opt/cludera/parcels 로 설정 정보와 lib 등등 모든 정보를 놓습니다.

 

하지만, 루트 디렉터리에 용량이 꽉차 disk full 이 날 수 있으니 아래와 같이 큰 disk가 마운트 되어 있는 디렉터리로 심볼릭링크를 걸어 줍니다.

 

sudo mkdir -p /home1/irteamsu/opt/cloudera
sudo ln -s /home1/irteamsu/opt/cloudera /opt/cloudera

 

그리고 이제 rync를 통하여 데이터를 가져와 줍니다.

 

 

3) PATH 설정

 

다 가져오셨다면 이제 PATH 설정을 해주기 위해 아래와 같이 bash_profile 을 수정합니다.

 

vim ~/.bash_profile

 

.bash_profile 파일을 vim 에디터로 엽니다.

 

아래와 같이 설정을 추가합니다.

 

HADOOP_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554
SPARK_HOME=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/spark
export HADOOP_CONF_DIR=/opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/lib/hadoop/etc/hadoop
export PATH=$HADOOP_HOME/bin::$SPARK_HOME/bin:$PATH

 

CDH 버전이 다르다면 위 경로를 자신의 버전에 맞게 바꿔서 사용하시면 됩니다.

 

마지막으로 아래와 같이 source 명령어를 통해 .bash_profile 의 내용을 반영합니다.

 

source ~/.bash_profile

 

4) 확인

 

아래와 같이 hdfs 명령어와 spark-shell 명령어가 정상 동작하는것을 확인할 수 있습니다.

 

추가로 hdfs 명령어 시 CDH에 있는 hdfs 에 있는 디렉터리가 나오는것을 볼 수 있습니다.

 

 

위와 같은 방법으로 클라이언트를 만드는 것의 단점이 있습니다.

 

바로 rsync로 지정한 디렉터리에 있는 모든 설정을 다 가져온다는 것 입니다.

클라이언트 서버는 spark를 사용하지 않는데도 spark 설정이 들어오게 되는 것 처럼요.

 

또한, 클라이언트가 추가 될 때마다 터널링을 수행해야 한다는 점입니다.

 

이러한 문제점들은 사실상 파일 서버를 두어 설정 파일들을 중앙 관리하도록 한다면 크게 문제가 되지 않을 것입니다.

 

지금은 예제를 위해 간단히 rsync를 통해 가져오도록 해보았습니다.ㅎㅎ

 

4. 마무리

 

이렇게, Spark에 대한 포스팅을 완료했습니다.

 

다음 포스팅에서는 Hdfs기반 Nosql로 많이 사용하는 Hbase에 대해 진행하도록 하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(7) Apache Zeppelin  (0) 2020.04.20
(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
반응형

1. 서론

이번 포스팅에서는 Apache Zeppelin 대해 알아 보도록하겠습니다.

 

2. Apache Zeppelin 이란?

Apache Zeppelin이란 Spark를 Web 기반 NoteBook으로 간편히 수행할 수 있게 제공하는 어플리케이션입니다.

 

ide를 통해 할 수도 있지만 Web 기반으로 어디서든 접근하여 간편히 Spark 로직을 생산할 때 많이 사용하는 도구입니다.

 

3. 설치 및 CDH와 연동

CDH ( = Cloudera's Distribution for Hadoop )에서 사용하는 CM (= Cloudera Manager)에서는 안타깝게도 Apache Zeppelin 설치를 제공하고 있지 않습니다.

 

HDP ( = Hortonworks Data Platform ) 에서 사용하는 Ambari 에서는 지원하고 있습니다.
Ambari에서 설치 법은 아래 URL에 나와있습니다.
https://docs.cloudera.com/HDPDocuments/HDP3/HDP-3.1.5/installing-zeppelin/content/installing_apache_zeppelin.html

 

때문에, 수동으로 설치하여 CDH와 연동하여 사용하여야 합니다.

 

 

1) 다운로드

 

저는 CDH 중 호스트 한개를 선택하여 설치를 진행하였습니다.

 

먼저 아래와 같이 zeppelin 을 다운받습니다.

( 현재 포스팅 시점에서는 zeppelin-0.9.0-preview1 가 최신 버전이지만 많이 사용하는 0.8.1 버전으로 진행하도록 하겠습니다. )

 

wget http://apache.mirror.cdnetworks.com/zeppelin/zeppelin-0.8.1/zeppelin-0.8.1-bin-all.tgz

 

다운받은 압축파일을 해제합니다.

 

tar -zxvf zeppelin-0.8.1-bin-all.tgz

 

Zeppelin의 경우 기본경로를 /opt/zeppelin으로 잡기 때문에 압축해제한 폴더를 해당 경로로 옮겨줍니다.

 

sudo mkdir -p /opt/zeppelin
sudo mv ./zeppelin-0.8.1-bin-all /opt/zeppelin

 

 

2) 설정 파일 수정

 

먼저 /opt/zeppelin/zeppelin-0.8.1-bin-all/conf 디렉터리로 이동합니다.

후에, 아래 3개 파일을 모두 cp 명령어를 통해 postfix .template를 제거한 파일을 만들어 줍니다.

 

cp shiro.ini.template shiro.ini
cp zeppelin-env.sh.template zeppelin-env.sh
cp zeppelin-env.cmd.template zeppelin-env.cmd

 

3개 파일을 모두 수정할건 아니지만 사용자가 필요시에 수정할 파일이기 때문에 미리 cp를 통해 만들어 주었습니다.

 

이제 zeppelin-env.sh 파일을 열어 아래와 같이 설정 해주시면 됩니다.

 

설정 값을 CDH의 parcels를 통해 설치한 경로로 잡아줘야 합니다.
제플린을 통해 스파크 코드가 CDH 노드들로 수행되기 위함입니다.

 

export JAVA_HOME="/usr/java/jdk1.8.0_181-cloudera"
export SPARK_HOME="/opt/cloudera/parcels/CDH/lib/spark"
export HADOOP_CONF_DIR="/etc/hadooc/conf"

 

기본 설정은 이걸로 끝입니다ㅎㅎㅎ

 

3) 실행

 

이제 bin 디렉터리인 /opt/zeppelin/zeppelin-0.8.1-bin-all/bin

으로 이동 후 zeppelin-daemon.sh start 명령어만 수행해주면 됩니다.

 

cd /opt/zeppelin/zeppelin-0.8.1-bin-all/bin
./zeppelin-daemon.sh start

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

4. 사용 예제

이제 위까지 진행하셨다면 기본 포트인 8080으로 접속하시면 아래와 같은 화면을 볼 수 있습니다.

 

 

1) 로그인하기

 

로그인은 앞에서 봤던 shiro.ini 파일에 각 인증, 권한을 허용하는 유저를 등록할 수 있습니다.

앞에서는 딱히 건들지 않았기 때문에 아래와 같은 기본 유저를 사용하시면 됩니다.

 

 

저는 첫번째인 user1 에 password2로 로그인하였습니다.

 

2) 노트 생성하기

 

로그인을 했으니 아래 사진의 create new note 를 클릭하여 추가합니다.

 

저는 feeder라는 디렉터리에 compare를 추가했습니다.

 

 

3) pyspark 코드 실행

 

생성한 노트에 들어가면 아래와 같은 화면이 나옵니다.

 

 

pyspark를 사용하기 위해 %spark.pyspark 를 맨 상단에 추가해주시면 됩니다.

 

아래는 간단하게 rdd를 만들어서 출력해보는 예제입니다.

 

 

5. 마무리

 

이번에는 Apache Zeppelin 에 대해서 설치와 간단한 사용 예제를 포스팅하였습니다.

다음에는 CDH와 스파크 클라이언트 연동에 대해 포스팅하겠습니다.

반응형

'BigData > Spark' 카테고리의 다른 글

(8) CDH와 스파크 클라이언트 연동  (0) 2020.04.21
(6) 스트럭처 스트리밍  (0) 2020.03.31
(5) 스파크 스트리밍  (0) 2020.03.19
(4) 스파크 SQL  (0) 2020.03.17
(3) 스파크 설정  (0) 2020.03.13
반응형

1. 서론

 

이번 포스팅에서는 Hbase가 제공하는 클라이언트 API 중 고급 기능에 대해 알아보겠습니다.

 

2. 필터

Hbase에서는 Get 말고도 필터를 이용하여 데이터를 조회하여 가져올 수 있습니다.

 

1) 필터 소개

 

Hbase 클라이언트는 Filter라는 추상클래스와 여러가지 구현클래스를 제공하고 있습니다.

 

또한, 개발자는 직접 Filter 추상 클래스를 구현하여 사용할 수 있습니다.

 

모든 필터는 실제로 서버측에 적용이 되어 수행되어 집니다.

클라이언트에서 적용되는 경우에는 많은 데이터를 가져와 필터링해야 하기 때문에 대규모 환경에서는 적합하지 않습니다.

 

아래는 필터가 실제로 어떻게 적용되는지 보여줍니다.

 

 

클라이언트에서 필터 생성 -> RPC로 직렬화한 필터 서버로 전송 -> 서버에서 역직렬화하여 사용

 

 

2) 필터의 계층 구조

 

Filter 추상클래스가 최상단이며 추상클래스를 상속받아 뼈대를 제공하는 추상클래스로 FilterBase가 있습니다.

Hbase에서 제공하는 구현 클래스들은 FilterBase를 상속하고 있는 형태입니다.

 

 

3) 비교 필터

 

Hbase가 제공하는 필터 중 비교연산을 지원하는 CompareFilter가 있습니다.

 

생성자 형식은 아래와 같습니다.

 

CompareFilter(final CompareOperator op, final ByteArrayComparable comparator)

 

 

CompareOperator 에는 [LESS, LESS_OR_EQUAL, EQUAL, NOT_EQUAL, GREATER_OR_EQUAL, GREATER, NO_OP] 가 있습니다.

ByteArrayComparable 는 추상 클래스로 compareTo 추상 메서드를 가지고 있습니다.

 

 

4) 로우 필터 - RowFilter

 

로우 필터는 로우 키 기반으로 데이터를 필터링 할 수 있도록 제공하고 있습니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Scan scan = new Scan();
scan.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-0"));

Filter filter1 = new RowFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("row-22")));
scan.setFilter(filter1);
ResultScanner scanner1 = hTable.getScanner(scan);
scanner1.forEach(System.out::println);
scanner1.close();

Filter filter2 = new RowFilter(CompareFilter.CompareOp.EQUAL, new RegexStringComparator(".*-.5"));
scan.setFilter(filter2);
ResultScanner scanner2 = hTable.getScanner(scan);
scanner2.forEach(System.out::println);
scanner2.close();

Filter filter3 = new RowFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator("-5"));
scan.setFilter(filter3);
ResultScanner scanner3 = hTable.getScanner(scan);
scanner3.forEach(System.out::println);
scanner3.close();

 

위 예제에서는 아래 필터들을 사용하는 것을 볼 수 있습니다.

 

  • filter1 : 지정한 로우키에 대해서 사전편찬식으로 저장되는 로우들을 이용하여 필터
  • filter2 : 정규표현식을 이용하여 필터
  • filter3 : 부분 문자열을 이용하여 필터.

 

5) 패밀리 필터 - FamilyFilter

 

패밀리 필터의 경우 로우 필터와 동작방식이 비슷하지만 로우 키가 아닌 로우 안의 컬럼패밀리를 대상으로 비교합니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Filter filter1 = new FamilyFilter(CompareFilter.CompareOp.LESS, new BinaryComparator(Bytes.toBytes("colfam3")));

Scan scan = new Scan();
scan.setFilter(filter1);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(System.out::println);
scanner.close();

Get get1 = new Get(Bytes.toBytes("row-5"));
get1.setFilter(filter1);
Result result1 = hTable.get(get1);
System.out.println("Result of get(): " + result1);

Filter filter2 = new FamilyFilter(CompareFilter.CompareOp.EQUAL, new BinaryComparator(Bytes.toBytes("colfam3")));
Get get2 = new Get(Bytes.toBytes("row-5"));
get2.addFamily(Bytes.toBytes("colfam1"));
get2.setFilter(filter2);
Result result2 = hTable.get(get2);
System.out.println("Result of get(): " + result2);

 

위 예제 볼 수 있듯이 Filter는 Get, Scan 두개에 setFilter 메서드를 통해 적용 가능합니다.

 

또한, 컬럼 패밀리도 사전 편찬식으로 저장되는것을 이용하는것을 볼 수 있습니다.

 

 

6) 퀄리파이어 필터 - QualifierFilter

 

퀄리파이어 필터는 말 그대로 퀄리파이어를 대상으로 비교하는 필터입니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Filter filter = new QualifierFilter(CompareFilter.CompareOp.LESS_OR_EQUAL, new BinaryComparator(Bytes.toBytes("col-2")));
        
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(System.out::println);
scanner.close();
        
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = hTable.get(get);
System.out.println("Result of get() : " + result);

 

7) 값 필터

 

이번에는 값을 대상으로 비교하는 필터입니다.

 

아래는 예제입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Filter filter = new ValueFilter(CompareFilter.CompareOp.EQUAL, new SubstringComparator(".4"));
        
Scan scan = new Scan();
scan.setFilter(filter);
ResultScanner scanner = hTable.getScanner(scan);
scanner.forEach(result -> {
    Arrays.asList(result.rawCells()).forEach(System.out::println);
});
scanner.close();
        
Get get = new Get(Bytes.toBytes("row-5"));
get.setFilter(filter);
Result result = hTable.get(get);
Arrays.asList(result.rawCells()).forEach(System.out::println);

 

 

8) 의존 컬럼 필터 - DependentColumnFilter

 

의존 컬럼 필터의 경우 단순 필터링이 아닌 더 복잡한 필터 기능을 제공합니다.

이 필터는 다른 컬럼이 필터링될지 여부를 결정하는 의존 컬럼을 지정합니다.

 

아래는 의존 컬럼 필터의 생성자 입니다.

 

DependentColumnFilter(
    final byte [] family, 
    final byte[] qualifier,
    final boolean dropDependentColumn, 
    final CompareOperator op,
    final ByteArrayComparable valueComparator
)

 

9) 단일 컬럼값 필터 - SingleColumnValueFilter

 

단일 컬럼값 필터는 특정 컬럼과 값에 대해 비교 필터로 사용합니다.

 

생성자는 아래와 같습니다.

 

SingleColumnValueFilter(
  final byte [] family, 
  final byte [] qualifier,
  final CompareOperator op, 
  final byte[] value
)

SingleColumnValueFilter(
  final byte [] family, 
  final byte [] qualifier,
  final CompareOperator op,
  final ByteArrayComparable comparator
)

 

또한 추가로 아래와 같은 메소드를 통해 미세 조정이 가능합니다.

 

boolean getFilterIfMissing()
void setFilterIfMissing(boolean filterIfMissing)
boolean getLatestVersionOnly()
void setLatestVersionOnly(boolean latestVersionOnly)

 

10) 단일 컬럼값 제외 필터 - SingleColumnValueExcludeFilter

 

이 필터는 위의 단일 컬럼값 필터의 반대 기능의 필터로 보시면 됩니다.

 

즉, 생성자에 전달한 참조 컬럼이 결과값에서 제외 되어집니다.

 

 

11) 접두어 필터 = PrefixFilter

 

접두어 필터의 경우 이름 그대로 인스턴스화 할때 지정한 접두어와 일치하는 모든 로우를 반환합니다.

 

아래는 생성자 코드입니다.

 

PrefixFilter(final byte [] prefix)

 

이 필터의 경우 scan에 setting하여 사용할때 유의미하며, 지정한 접두어보다 큰 로우키를 만났을때는 알아서 종료되어

불필요한 탐색작업이 일어나지 않도록 되어 있습니다.

 

이 또한 Hbase의 사전 편찬식으로 정렬되어 있기 때문에 가능한 동작입니다.

 

 

12) 페이지 필터 - PageFilter

 

이 필터를 이용하면 로우 단위로 페이징 기능을 제공합니다.

 

인스턴스를 생성할 때 pageSize 파라미터를 지정하여 페이지당 몇 개의 로우를 반환할지 지정할 수 있습니다.

 

클라이언트 코드에서는 반환 받은 마지막 로우를 기억하고 있다가, 다음 이터레이션을 시작할때 시작 로우로 설정하여 사용할 수 있습니다.

 

이 필터는 시작 로우를 포함하므로 다음 페이징을 할때는 마지막 로우에 0byte를 추가하여 시작 로우로 설정하면 됩니다.

0byte는 증가시킬수 있는 최소한의 값이므로 안전하게 스캔 범위를 재설정할 수 있습니다.
또한 0byte 추가된 로우가 실제로 있더라도 페이지 필터는 시작 로우를 포함하므로 문제가 되지 않습니다.

 

아래는 예제 코드입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));
        
Filter filte = new PageFilter(15);
final byte[] POSTFIX = Bytes.toBytes(0);
        
int totalRows = 0;
byte[] lastRow = null;
while(true) {
    Scan scan = new Scan();
    scan.setFilter(filte);
    
    if(lastRow != null) {
        byte[] startRow = Bytes.add(lastRow, POSTFIX);
        System.out.println("start row : " + Bytes.toStringBinary(startRow));
        scan.setStartRow(startRow);
    }

    ResultScanner scanner = hTable.getScanner(scan);
    int localRows = 0;
    Result result;
    while((result = scanner.next()) != null) {
        System.out.println(localRows++ + ":" + result);
        totalRows++;
        lastRow = result.getRow();
    }
    scanner.close();
    
    if(localRows == 0) break;
}
System.out.println("total rows : " + totalRows);

 

책에는 나와 있지 않지만, 물리주소가 아닌 로우키를 기준으로 페이징을 하여 시점에 따라 페이징 결과값은 달라질 수 있을 것으로 예상되어 집니다.
실제 서비스에서는 이 부분을 고려하여  적용해야 합니다.

 

13) 키 전용 필터 

 

이 필터는 데이터의 키 정보에만 접근하고 값 정보는 사용하지 않는 경우 사용합니다.

 

 

14) 최초 키 전용 필터

 

각 로우에서 Hbase 내부적으로 정렬된 첫번째 컬럼에 접근하는 기능을 제공합니다.

 

이러한 필터는 대체적으로 로우의 갯수를 셀 때 사용하곤 합니다.

 

 

15) 종료 로우 포함 필터 - InclusiveStopFilter

 

보통 필터들의 경우 시작 로우는 포함하나 종료 로우는 포함하지 않습니다.

하지만, 종료로우도 포함하기를 원할때는 이 필터를 사용하면 됩니다.

 

 

16) 타임스탬프 필터 - TimestampsFilter

 

스캔 결과에 버전 단위까지 미세조정하려면 타임스탬프 필터를 사용하면 됩니다.

 

아래는 생성자 코드입니다.

 

TimestampsFilter(List<Long> timestamps)

 

인자로 타임스탬프 리스트를 받는것을 볼 수 있습니다.

 

이 리스트에 포함된 타임스탬프와 동일한 결과만 반환받게 됩니다.

또한 scan의 경우 자체적으로 setTimeRange 메서드를 통해 범위를 지정할 수 있는데 타임스탬프 필터도 적용하게 되면

지정한 범위 안에서 필터에 지정한 버전과 동일한 값들만을 반환하게 됩니다.

 

 

17) 컬럼 개수 제한 필터 - ColumnCountGetFilter

 

로우 당 지정한 최대 개수만큼의 컬럼만 반환받는 필터입니다.

 

생성자는 아래와 같습니다.

 

ColumnCountGetFilter(final int n)

 

로우에서 설정된 최대 갯수만큼의 컬럼이 발견되면 전체 스캔을 중단하기 때문에 그다지 유용한 필터는 아닙니다.

 

 

18) 컬럼 페이지 필터 - ColumnPaginationFilter

 

페이지 필터와 비슷하지만, 이 필터는 한 로우 안의 컬럼을 대상으로 페이징을 제공합니다.

 

아래는 생성자 입니다.

 

ColumnPaginationFilter(final int limit, final int offset)

 

 

19) 컬럼 접두어 필터 - ColumnPrefixFilter

 

이 필터는 컬럼을 대상으로 지정한 접두어가 있는 값들을 필터링합니다.

 

생성자는 아래와 같습니다.

 

ColumnPrefixFilter(final byte [] prefix)

 

20) 스킵 필터 - SkipFilter

 

스킵 필터는 보조 필터로서 다른 필터를 감싼 형태로 동작합니다.

감싸진 필터가 건너뛸 인스턴스에 대한 단서를 제공하면 그 전체 로우를 제외하게 됩니다.

 

 

21) 스캔 중단 필터 - WhileMatchFilter

 

스캔 중단 필터도 보조 필터로서 하나라도 필터링되는 순간 전체 스캔을 중단하도록 합니다.

 

 

22) 필터 리스트 - FilterList

 

위에서 살펴본 필터들을 중첩하여 사용하고 싶을 수 있습니다.

이를 위해, 사용하는것이 필터 리스트입니다.

 

생성자는 아래와 같습니다.

 

FilterList(final List<Filter> filters)
FilterList(final Operator operator)
FilterList(final Operator operator, final List<Filter> filters)

 

filters에는 적용할 필터의 리스트를 의미합니다.

operator는 필터 결과를 어떻게 만들지 지정합니다.

 

아래는 Operator 값 입니다.

 

연산자 설명
MUST_PASS_ALL 모든 필터를 통과한 값만이 결과에 추가 ( = AND)
MUST_PASS_ONE 필터 중 하나라도 통과 한 값은 결과에 추가 ( = OR)

 

생성자 말고도 필터를 추가해야 할때는 아래 메소드를 사용하면 됩니다.

 

void addFilter(Filter filter)
void addFilter(List<Filter> filters)

 

또한, List 의 구현체에 따라 필터의 순서를 정할 수 있습니다.

ArrayList의 경우 담긴 순서대로 필터가 적용되는것을 보장합니다.

 

 

23) 사용자 정의 필터

 

지금까지는 Hbase 클라이언트가 제공하는 필터 종류에 대해 알아보았습니다.

 

하지만, 제공하는 필터가 아닌 사용자가 만든 Custom 한 필터가 필요한 경우가 있습니다.

 

이런경우에는 Filter 혹은 FilterBase 추상 클래스를 상속받아 만들 수 있습니다.

 

 

 

 

 

반응형

 

 

 

 

3. 카운터

Hbase에서는 고급 기능인 카운터 기능도 제공하고 있습니다.

 

1) 카운터 소개

 

카운터 기능이 없다면 개발자는 수동으로 아래와 같은 절차를 만들어야 합니다.

 

  1. 로우 lock
  2. 값 조회
  3. 값 증가 후 write
  4. lock 해제

 

하지만 이러한  절차는 과도한 경합상황을 야기하며,

lock이 잠긴상태로 어플리케이션이 죽게되면 lock 정책에 따른 타임아웃이 끝나기 전까지는 다른 어플리케이션인 접근할 수 없게 됩니다.

 

Hbase에서는 클라이언트 측 호출을 통해서 이러한 절차를 원자적으로 처리할 수 있도록 제공하고 있습니다.

더불어, 한 호출로 여러개의 카운터 갱신 기능도 제공하고 있습니다.

 

아래는 hbase shell 에서 카운터 예제를 수행한 사진입니다.

 

 

incr 은 주어진 인자 만큼 카운터의 값을 증가 시킨 후 반환 받습니다.

get_counter는 현재 카운터 값을 반환합니다.

 

아래는 카운터도 보통의 컬럼 중에 하나라는것을 증명하는 사진입니다.

 

 

추가로 incr의 경우 음수를 인자로 주어 카운터를 감소시킬수도 있습니다.

 

 

2) 단일 카운터

 

Hbase 클라이언트는 단일 카운터만을 대상으로 처리할 수 있도록 제공하고 있습니다.

이때, 정확한 카운터 컬럼을 지정해야 합니다.

 

아래는 HTable 클래스에서 제공하는 메서드입니다.

 

long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount)
long incrementColumnValue(byte[] row, byte[] family, byte[] qualifier, long amount, Durability durability)

 

카운터를 수행할 좌표값(row, family, qualifier) 와 카운터 증감 값을 인자로 받는것을 알 수 있습니다.

단, Durability 인자로 오버로딩이 되어 있습니다.

이 Durability 는 WAL에 반영을 어떻게 할건지에 대한 설정입니다.

 

Durability 는 enum으로 아래와 같은 값들이 정의되어 있습니다.

 

public enum Durability {
  USE_DEFAULT,
  SKIP_WAL,
  ASYNC_WAL,
  SYNC_WAL,
  FSYNC_WAL
}

 

 

3) 복수 카운터

 

카운터를 증가시키는 방법으로는 HTable의 increment 메서드가 있습니다.

 

아래는 increment 메서드 명세입니다.

 

Result increment(final Increment increment)

 

이 메서드를 사용하기 위해서는 Increment 인스턴스를 만들어야 합니다.

Increment 인스턴스에는 카운터 컬럼의 좌표값과 적절한 세부 사항을 넣어야 합니다.

 

아래는 Increment의 생성자입니다.

 

Increment(byte [] row)

 

생성자로 로우키를 먼저 필수로 지정한 뒤 범위를 줄이고 싶을때는 아래와 같은 메서드로 가능합니다.

 

Increment addColumn(byte [] family, byte [] qualifier, long amount)

 

카운터의 경우 버전은 내부적으로 처리하기 때문에 인자에 timestamp를 받는 부분이 없습니다.

또한, addFamily 메서드도 없는데 그 이유는 카운터는 특정 컬럼이기 때문입니다.

 

추가로 시간 범위를 지정하여 읽기 연산의 이점을 얻는 메서드도 제공하고 있습니다.

 

Increment setTimeRange(long minStamp, long maxStamp)

 

아래는 다중 컬럼의 카운터 연산을 하는 예제 입니다.

 

Configuration conf = HBaseConfiguration.create();
Connection connection = ConnectionFactory.createConnection(conf);
Table hTable = connection.getTable(TableName.valueOf("testtable"));

Increment increment1 = new Increment(Bytes.toBytes("young!!"));
increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 1);
increment1.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 10);
increment1.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), 10);

Result result1 = hTable.increment(increment1);
result1.listCells().forEach(System.out::println);

Increment increment2 = new Increment(Bytes.toBytes("young!!"));
increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("clicks"), 5);
increment2.addColumn(Bytes.toBytes("daily"), Bytes.toBytes("hits"), 1);
increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("clicks"), 0);
increment2.addColumn(Bytes.toBytes("weekly"), Bytes.toBytes("hits"), -5);

Result result2 = hTable.increment(increment2);
result2.listCells().forEach(System.out::println);

 

Increment를 통해서 복수개의 컬럼에 카운터를 처리하는 것을 볼 수 있습니다.

 

4. 보조 처리기

Hbase에서는 보조 처리기라는 것을 이용하여 계산 작업의 일부를 리전 서버에 전가시키는 방법을 제공하고 있습니다.

앞절까지는 조회하는 데이터를 제한한 후 클라이언트에서 비즈니스 로직을 통해 계산했었습니다.

 

1) 보조 처리기 소개

 

Hbase에서 제공하는 보조 처리기는 임의의 코드를 각 리전 서버에서 직접 실행하게 해줍니다.

 

이 보조처리기의 경우 필터처럼 인터페이스를 구현하여 사용자 정의 보조 처리기를 만들 수 있습니다.

구현한 코드는 컴파일하여 jar 형태로 Hbase에 전달하면 됩니다.

이 보조 처리기는 필터와는 달리 동적으로 로드할 수 있어 Hbase ㅋ,ㄹ러스터의 기능성을 쉽게 확장할 수 있습니다.

 

보조 처리기를 구현하여 사용할 시 아래 두 클래스를 사용하면 됩니다.

 

1. 옵저버

 

특정 이벤트가 발생 시 콜백메서드를 수행하는 클래스입니다.

 

Hbase에서 제공하는 옵저버 인터페이스 종류는 아래와 같습니다.

org.apache.hbase:hbase-endpoint 를 추가해야 합니다.

 

  • RegionObserver = 데이터 조작 이벤트를 처리하는 옵저버, 테이블이 위치한 리전과 밀접하게 연관
  • MasterObserver = 관리 또는 DDL 유형의 동작에 반응하는 옵저버, 클러스터 전반에 걸친 이벤트를 처리
  • WALObserver = WAL 처리에 대한 콜백을 제공합니다.

 

2. 엔드포인트

 

엔드포인트는 RPC를 동적으로 확장하여 콜백 원격 절차를 추가한 것으로, RDBMS에서 프로시져로 이해하면 됩니다.

 

이 엔드포인트는 옵저버와 결합하여 사용할 수 있으며 모두 Coprocessor 인터페이스를 상속하고 있습니다.

 

 

2) Coprocessor 인터페이스

 

모든 보조 처리기는 이 인터페이스를 구현해야 합니다.

 

이 인터페이스에는 enum으로 Pritority, State를 제공하고 있습니다.

 

아래는 Pritority 값에 대한 설명입니다.

 

설명
SYSTEM 우선 순위가 가장 높으며 가장 먼저 실행되어야 하는 보조 처리기
USER SYSTEM 우선순위 값을 가진 보조처리기가 실행된 다음에 실행

 

보조 처리기는 각자 생명주기가 있고, 프레임워크에서 관리하게 됩니다.

 

Coprocessor 는 생명주기에 대해 상요자 정의를 제공하기 위해 아래와 같은 두 메서드를 제공합니다.

 

void start(CoprocessorEnvironment env) throws IOException
void stop(CoprocessorEnvironment env) throws IOException

 

이 메서드들은 보조처리기가 시작할때와 끝날때 호출되어집니다.

인자로 받는 CoprocessorEnvironment는 인스턴스의 생명주기 전체에 걸쳐 상태를 저장하는데 사용되어 집니다.

 

이제 두번째 enum State는 바로 이런 보조처리기의 생명주기의 상태값을 의미합니다.

 

설명
UNINSTALLED 보조 처리기가 최초 상태에 있음. 아직 환경을 갖지 않았고 초기화 되지 않음.
INSTALLED 인스턴스가 환경안에 설치되었음
STARTING 보조 처리기 시작 직전 상태, 즉 보조 치리기의 start 메서드가 실행되기 직전 상태 
ACTIVE start 메서드 호출에 대한 응답이 반환된 상태
STOPPING stop 메서드가 실행되기 직전 상태
STOPPED stop 메서드가 호출에 대한 응답이 반환된 상태

 

마지막으로 CoprocessorHost 클래스가 있습니다.

이 클래스는 보조 처리기의 호스트가 어디에서 사용되는지에 따라 하위 클래스가 구분됩니다.

호스트의 경우 마스터 서버, 리전 서버를 의미합니다.

 

아래는 클라이언트에서 요청한 연산이 어떻게 보조처리기에 적용되는지를 보여주는 그림입니다.

 

 

 

3) 보조 처리기 로드

 

보조 처리기는 정적, 동적 모두 로드 되도록 할 수 있습니다.

 

1. 설정 파일에 의한 로드

 

Hbase가 시작할 때 로드할 보조 처리기를 전역적으로 설정 할 수 있습니다.

 

방법은 아래와 같이 hbase-site.xml 을 수정하면 됩니다.

 

<property>
    <name>hbase.coprocessor.region.classes</name>
    <value>coprocessor.RegionObserverExample, coprocessor.AnotherCoprocessor</value>
</property>
<property>
    <name>hbase.coprocessor.master.classes</name>
    <value>coprocessor.MasterObserverExalple</value>
</property>
<property>
    <name>hbase.coprocessor.wal.classes</name>
    <value>coprocessorWALObserverExample, bar.foo.MyWALObserver</value>
</property>

 

각 속성에 있는 처리기 순서가 실행 순서가 됩니다.

또한, 여기에 정의된 보조처리기는 모두 시스템 수준 우선순위를 가지고 로드 됩니다.

 

 

2. 테이블 지시자에 의한 로드

 

테이블 지시자로도 로드할 수 있습니다.

테이블 지시자의 경우 테이블 단위로 할당 되는것으로 이곳에 정의된 보조 처리기는 해당 테이블이 속한 리전 및 리전 서버에서만 로드됩니다.

 

리전에서만 로드 되는것으로 Master, WAL 처리기는 사용할 수 없습니다.

 

테이블 지시자는 setValue를 통해 아래와 같이 보조 처리기의 정의를 추가해야 합니다.

key는 COPROCESSOR로 시작해야하며, value는 <jar_파일_경로>|<클래스_이름>|<우선순위> 의 포맷이여야 합니다.

 

jar 파일 경로의 경우 hdfs의 경로를 사용할 수 있습니다.

 

아래는 쉘을 통해 처리기를 추가하는 예제입니다.

'COPROCESSOR$1' => '/Users/laura/test2.jar|coprocessor.AnotherTest|USER'

 

마지막 우선순위에는 위에서 언급한 SYSTEM, USER 중에 하나를 지정합니다.

 

위에서 $를 사용하여 처리기가 로드되는 순서를 지정할 수 있습니다.

 

 

4) RegionObserver 클래스

 

RegionObserver 클래스는 리전 수준에서 사용되는 클래스로서, 리전 안에서 발생하는 특정 동작에 따라 발동하는 hook 이 있습니다.

 

hook을 발동시키는 동작은 두개로 아래와 같이 나눌 수 있습니다.

 

  • 리전 생명 주기 변경
  • 클라이언트 API 호출

1. 리전 생명 주기 변경

 

옵저버는 아래 그림과 같이 [열리기 전, 열림, 닫히기 전] 상태 변경에 반응할 수 있습니다.

 

 

열리기 전 상태

 

리전이 열리기 직전에 이 상태가 됩니다.

옵저버는 아래 메서드를 통해 리전을 열기 직전과 열린 직후에 프로세스에 영향을 줄 수 있습니다.

 

void preOpen()
void postOpen()

 

추가로 리전이 열리기 전 상태가 지나고 열림 상태 직전에 WAL에 있는 정보를 사용할 수도 있습니다.

이런경우를 위해 아래와 같은 메서드 hook 이 있습니다.

 

void preWALRestore()
void postWALRestore()

 

열림 상태

 

리전이 리전 서버에 배치되고 완전히 동작 가능한 상태입니다.

 

아래는 각 flush, compact, split 에 대한 동작에 대한 hook을 제공하는 메서드입니다.

 

void preFlush()
void postFlush()
void preCompact()
void postCompact()
void preSplit()
void postSplit()

 

닫히기 전 상태

 

리전 닫기가 임박한 상태입니다.

 

아래와 같은 메서드로 리전이 닫히기 직전과 직후에 대해서 핸들링할 수 있습니다.

 

void preClose(..., boolean abortRequested)
void postClose(..., boolean abortRequested)

 

abortRequested 인자는 리전이 닫히는 이유입니다.

일반적으로는 로드밸런싱을 위해 리전이 분할할 때 닫히게 되지만 일부 오작동으로 인해 닫히는 경우가 있기 때문입니다.

 

 

2. 클라이언트 API 이벤트 처리

 

클라이언트 API는 모두 명시적으로 리전 서버로 전달됩니다.

보조 처리기는 이 API 메서드가 실행되기 직전과 직후에 대한 hook을 제공합니다.

 

아래는 제공 hook 입니다.

 

  • void preGet / void postGet
  • void prePut /  void postPut
  • void preDelete / void postDelete
  • boolean preCheckAndPut / boolean postCheckAndPut
  • boolean preCheckAndDelete / boolean postCheckAndDelete
  • void preGetClosestRowBefore / void postGetClosestRowBefore
  • boolean preExists / boolean postExists
  • long preIncrementColumnValue / long postIncrementColumnValue
  • void preIncrement / void postIncrement
  • InternalScanner preScannerOpen / InternalScanner postScannerOpen
  • boolean preScannerNext / boolean postScannerNext
  • void preScannerClose / void postScannerClose

 

3. ResionCoprocessorEnvironment 클래스

 

RegionObserver 인터페이스를 구현하는 인스턴스는 RegionCoprocessorEnvironment 클래스를 상속하게 됩니다.

 

RegionCoprocessorEnvironment  클래스는 이름 그대로 리전에 환경 정보를 담당하는 클래스로 아래와 같은 메서드가 지원됩니다.

 

  • getRegion() = 현재 옵저버가 연관된 리전에 대한 참조 반환
  • getRegionserverServices() = 공유자원인 RegionServerServices 인스턴스에 대한 접근 제공

 

4. ObserverContext 클래스

 

ObserverContext는 옵저버 인스턴스의 현재 환경에 대한 접근을 제공하며,

콜백 메서드의 수행이 완료한 뒤에 보조 처리기 프레임워크가 무엇을 할지 지정할 수 있는 기능을 제공합니다.

 

아래는 제공하는 기능 중 중요한 두가지입니다.

 

  • bypass = 보조 처리기의 연쇄적 실행 흐름에 영향 -> 다음 보조 처리기를 타지 않게 합니다.
  • complete = 서버 측 프로세스를 중지

 

5) MasterObserver 클래스

 

MasterObserver 클래스는 마스터 서버가 호출 할 수 있는 콜백 메서드를 처리하는 기능을 제공합니다.

 

아래는 MasterObserver hook 종류입니다.

 

  • void preCreateTable / void postCreateTable
  • void preDeleteTable / void postDeleteTable
  • void preModifyTable / void postModifyTable 
  • void preAddColumn / void postAddColumn
  • void preModifyColumn / void postModifyColumn
  • void preDeleteColumn / void postDeleteColumn
  • void preEnableTable / void postEnableTable
  • void preDisableTable / void postDisableTable
  • void preMove / void postMove
  • void preAssign / void postAssign
  • void preUnassign / void postUnassign
  • void preBalance / void postBalance
  • boolean preBalanceSwitch / void postBalanceSwitch
  • void preShutdown
  • void preStopMaster

 

6) 엔드포인트 보조 처리기

 

하나의 리전에서만이 아닌 모든 리전에서 각자 어떠한 동작을 수행 후 취합하기를 원하는 경우가 있습니다.

하지만 지금까지 알아본 가능으로는 위와같이 할 수 없습니다.

할 수 있더라도 아마 모든 테이블을 스캔하는 동작이기에 성능상 안좋습니다.

 

HBase에서는 이러한 문제를 위해 엔드포인트 보조 처리기를 제공합니다.

 

1. CoprocessorProtocol

 

클라이언트에게 사용자 정의 RPC 프로토콜을 제공하려면 CoprocessorProtocol을 상속하는 인터페이스를 정의해야 합니다.

 

이 프로토콜을 사용하면 HTable이 제공하는 아래 메서드를 통해 보조 처리기 인스턴스와 통신할 수 있습니다.

 

<T extends CoprocessorProtocol> T coprocessorProxy(Class<T> protocol, byte[] row)
<T extends CoprocessorProtocol, R> Map<byte[], R> coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable)
<T extends CoprocessorProtocol, R> void coprocessorExec(Class<T> protocol, byte[] startKey, byte[] endKey, Batch.Call<T, R> callable, Batch.Call<T, R> callback)

 

위의 메서드에서 알 수 있듯이 CoprocessorProtocol 인스턴스는 리전과 연동되기 때문에, 클라이언트에서는 미리 어떤 리전에서 실행되어야 하는지 알아야하는 단점이 있습니다.

 

 

2. BaseEndPointCoprocessor 클래스

 

엔드포인트 보조 처리기를 구현하기 위해서는 위의 CoprocessorProtocol 뿐만이 아니라 BaseEndPointCoprocessor 클래스도 확장해야 합니다.

 

CoprocessorProtocol는 클라이언트와 서버의 RPC 프로토콜을 정의한거라면 BaseEndPointCoprocessor 는 실제 처리를 정의하는 구현체입니다.

 

아래는 엔드포인트 보조 처리기를 통해 호출되는 과정을 그림으로 나타낸 것입니다.

 

 

 

출처: 라스조지 [Hbase 완벽가이드] 한빛미디어 2013년 297p

 

5. HTablePool

HTable을 계속 생성하는것은 큰 오버헤드를 줄 수 있습니다.

그 이유는 HTable 인스턴스를 생성하는데 비용이 생각보다 크기 때문입니다.

 

HTable을 여러 쓰레드가 공유해서 쓰는것도 불가능 합니다. 이유는 HTable은 thread-safe 하지 않기 때문입니다.

 

때문에 HTablePool 클래스를 통해 해결해야 합니다.

 

HTablePool 클래스는 오직 클라이언트 API 인스턴스를 풀링하는 목적으로 만들어 졌으며,

생성자 명세는 아래와 같습니다.

 

HTablePool()
HTablePool(Configuration config, int maxSize)
HTablePool(Configuration config, int maxSize, HTableInterfaceFactoey tableFactory)

 

maxSize는 관리할 인스턴스 수 이고 tableFactory는 인스턴스를 생성할 팩토리 클래스입니다.

 

HTablePool은 테이블 단위로 풀을 관리하며 사용시에는 아래와 같은 메서드를 이용하면 됩니다.

 

HTableInterface getTable(String tableName)
HTableInterface getTable(byte[] tableName)
void putTable(HTableInterface table)

 

 

아래는 풀을 닫을 때 사용하는 메서드입니다.

 

void closeTablePool(String tableName)
void closeTablePool(byte[] tableName)

 

 

6. 연결처리

HBase 클라이언트는 인스턴스 생성 시 리전 정보를 가져오기 위해 주키퍼와 연결을 맺고, 클라이언트 쪽에 캐싱을 해두고 사용합니다.

내부적으로 캐싱된 정보로 요청시 리전을 못찾는 오류의 경우에는 주키퍼에 다시 한번 콜하여 캐시정보를 갱신합니다.

 

주키퍼에 대한 연결은 사용자가 관리해야하며 무작위로 연결을 증가하다보면, 가능한 연결 갯수를 넘어 IOException이 발생 할 수 있습니다.

 

때문에, 사용자는 작업이 끝나면 HTable의 close메서드를 호출해야합니다.

 

7. 마무리

 

이번 포스팅에서는 클라이언트 API : 고급 기능에 대해 진행하였습니다.

 

다음 포스팅에서는 챕터 5장인 클라이언트 API : 관리 기능에 대해 진행하겠습니다.

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(2) 설치  (0) 2020.04.08
(1) 소개  (0) 2020.04.07
반응형

1. 서론

이번 포스팅에서는 Hbase가 제공하는 클라이언트 API 중 기본적인 기능에 대해 알아보겠습니다.

 

2. 일반 정보

Hbase에 접근하는 주요 클라이언트 인터페이스는 org.apache.hadoop.hbase.client 패키지에 있는 HTable 입니다.

 

HTable은 Hbase에 데이터를 저장, 삭제의 일을 할 수 있도록 제공합니다.

 

단, HTable을 사용할 시 주의할 점이 있습니다.

 

HTable은 인스턴스화될 시 메타(.META.) 테이블을 스캔하여 테이블이 있는지, 있다면 활성화 되있는지를 확인하고

그 외에도 부수적인 작업들을 수행하기 때문에 인스턴스화 작업은 느립니다.

 

따라서, HTable 인스턴스화는 되도록 한번만 수행하며 스레드당 하나씩만 생성하도록 하는것이 좋습니다.

그리고, 한번 생성된 HTable는 어플리케이션이 끝날때까지 재사용하는 편이 성능상 이점을 볼 수 있습니다.

 

3. CRUD 기능

HTable의 CRUD 역할에 대해 소개하겠습니다.

 

1) Put 메소드

 

데이터를 적재할 때 사용하는 API입니다.

단일로우와 멀티로우를 적재할 수 있도록 모두 제공하고 있습니다.

 

 

1. 단일 Put

 

put 메서드의 명세는 아래와 같습니다.

 

void put(Put put) throws IOException

 

인자로 받는 Put은 하나 또는 리스트 형태로 받을 수 있습니다.

 

아래는 Put의 생성자 메서드 명세입니다.

 

Put(byte[] row)
Put(byte[] row, RowLock rowLock)
Put(byte[] row, long ts)
Put(byte[] row, long ts, RowLock rowLock)

 

Hbase의 로우는 고유한 키입니다.

 

이 키는 타입이 자바의 바이트 배열로 어떠한 값이 들어와도 상관이 없습니다.

단, 로우 키는 사전편찬식으로 정렬된다는 것을 명심하고 키설계를 하여 사용해야 합니다.

 

아래는 어떠한 타입이든 바이트 배열로 만들어주는 헬퍼 클래스인 Bytes의 메서드입니다.

 

static byte[] toBytes(ByteBuffer bb)
static byte[] toBytes(String s)
static byte[] toBytes(boolean b)
static byte[] toBytes(long val)
static byte[] toBytes(float f)
static byte[] toBytes(int val)

 

Put 인스턴스를 생성 한 다음에는 컬럼패밀리, 퀄리파이어, 값, 타임스탬프 등을 추가할 수 있습니다.

 

아래는 Put의 데이터 추가 메서드입니다.

 

Put add(byte[] family, byte[] qualifier, byte[] value)
Put add(byte[] family, byte[] qualifier, long ts, byte[] value)
Put add(KeyValue kv) throws IOException

 

add 메서드를 한번 수행할 때마다 하나의 컬럼이 추가됩니다.

 

KeyValue 클래스의 경우에는 하나의 고유한 셀을 나타내는 고급 클래스입니다.

 

이 클래스를 얻기 위해서는 get 메서드를 통해 얻을 수 있습니다.

 

아래는 KeyValue를 가져오는 get 메서드입니다.

 

List<KeyValue> get(byte[] family, byte[] qualifier)
Map<byte[], List<KeyValue>> getFamilyMap()

 

특정 셀이 존재하는지 알아볼때에는 아래와 같이 get이 아닌 has 메서드를 제공합니다.

 

boolean has(byte[] family, byte[] qualifier)
boolean has(byte[] family, byte[] qualifier, long ts)
boolean has(byte[] family, byte[] qualifier, byte[] value)
boolean has(byte[] family, byte[] qualifier, long ts, byte[] value)

 

아래는 put을 통해 데이터를 적재하는 간단한 예제입니다.

 

public class PutExample {

    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();

        HTable hTable = new HTable(conf, "testtable");

        Put put = new Put(Bytes.toBytes("row1"));
        
        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
        put.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val2"));

        hTable.put(put);
    }
}

 

위의 Configuration은 org.apache.hadoop.conf 패키지에 존재하며 설정 정보를 주입하는 역할입니다.

 

기존 설정정보는 hbase-site.xml에 기입해야하지만, 동적으로 변경해야 할때는  Configuration를 사용하면 됩니다.

 

아래는 주키퍼 쿼럼 정보를 동적으로  Configuration를 사용하여 세팅하는 예제입니다.

 

Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "zk1.foo.com,zk2.foo.com,");

 

put의 경우 타임스탬프를 부여하여 각 셀에 대하여 버저닝을 할 수도 있습니다.

부여하지 않을 시에는 로우가 저장소에 추가되는 순간에 해당 리전서버의 시각으로 자동 부여가 됩니다.

 

Hbase의 경우, 셀은 타임스탬프값을 기준으로 정렬되어져 저장됩니다.

 

 

2. KeyValue 클래스

 

코드상에서 KeyValue 인스턴스를 처리해야 하는 경우가 종종 있습니다.

때문에, KeyValue 클래스에 대해 간단히 살펴보겠습니다.

 

우선, KeyValue 클래스는 특정 셀의 정보를 가지고 있습니다.

특정 셀의 정보는 로우 키, 컬럼패밀리, 컬럼 퀄리파이어, 타임스탬프를 의미합니다.

 

메서드로는 아래와 같이 있습니다.

 

// 아래 3개는 KeyValue 인스턴스에 저장되어 있는 전체 바이트 배열에 관한 메서드입니다.
byte[] getBuffer()
int getOffset()
int getLength()

// 아래 2개는 로우키와 데이터가 저장된 워시 좌표 정보의 바이트 배열을 반환하는 메서드입니다.
byte[] getRow()
byte[] getKey()

 

또한, KeyValue 클래스는 내부적으로 Comparator를 통해 값에 대해서 커스텀하게 정렬을 할 수 있도록 제공합니다.

 

아래는 KeyValue 클래스에서 제공하는 Comparator 종류입니다.

 

비교 연산자 설명
KeyComparator KeyValue 2개의 키를 비교합니다.
즉, getKey 메서드를 통해 원시 바이트 배열을 비교합니다.
KVComparator 원시 형태의 KeyComparator를 감싼 형으로서,
KeyValue 2개를 비교할 때 사용합니다.
RowComparator getRow 로 얻은 값으로 비교합니다.

 

KeyValue 클래스는 type이라는 필드를 가지고 있습니다.

이 type은 KeyValue에 하나의 차원이 더 추가되는것과 같습니다.

 

type 에 사용가능한 값은 아래와 같습니다.

 

유형 설명
Put 해당 KeyValue 인스턴스가 일반적인 Put 연산임을 의미합니다.
Delete 해당 KeyValue 인스턴스가 일반적인 Delete 연산임을 의미합니다.
DeleteColumn Delete와 같지만, 더 광범위하게 전체 컬럼을 삭제한다는 의미입니다.
DeleteFamily Delete와 같지만, 더 광범위하게 전체 컬럼패밀리 및 그에 속한 모든 컬럼을 삭제한다는 의미입니다.

 

마지막으로 KeyValue에 정보를 출력하고 싶으실때는 아래 메서드를 사용하시면 됩니다.

 

String toString()

 

아래는 toString 메서드의 결과 형식을 나타냅니다.

 

<row-key>/<family>:<qualifier>/<version>/<type>/<value-length>

 

 

3. 클라이언트 측 쓰기 버퍼

 

Hbase의 경우 쓰기 연산은 RPC를 통해 이루어 집니다.

 

이 RPC는 remote procedure call로서 갯수가 적을때는 괜찮지만 초당 수천개의 값을 테이블에 저장하는 데에는 적합하지 않습니다.

 

그로인해 Hbase 클라이언트는 내부적으로 쓰기 버퍼를 두어 한번의 RPC로 서버에 데이터를 전송하도록 제공하고 있습니다.

 

쓰기버퍼의 사용여부는 아래와 같이 설정할 수 있습니다.

현재 포스팅을 작성하는 때에는 아래와 같이 버퍼 관련은 deprecated 되었습니다. -> 최신 버전에서는 아예 메서드에서 제외되었습니다.
대신, BufferedMutator 클래스를 별도로 제공하여 아래와 같은 기능을 제공하고 있습니다.

지금은 책내용을 기반으로 진행하도록 하겠습니다.

 

void setAutoFlush(boolean autoFlush)
boolean isAutoFlush()

 

디폴트는 비활성 상태입니다.

활성으로 하기 위해서는 아래와 같이 setAutoFlush를 false로 설정합니다.

 

table.setAutoFlush(false)

 

위와 같이 false로 설정된 후부터는 put 메서드를 호출하더라도 클라이언트측 메모리 버퍼에 저장되고 서버로는 실제 전송이 이루어 지지 않습니다.

 

서버로 RPC를 날리기 위해서는 아래와 같은 메서드를 호출하면 됩니다.

 

void flushCommits() throws IOException

 

쓰기 버퍼의 경우 클라이언트에서 내부적으로 알아서 비워주기 때문에 크게 고려하지 않아도 됩니다.

 

추가로, 쓰기 버퍼의 크기도 조정이 가능한데 방법은 아래와 같습니다.

 

long getWriteBufferSize()
long setWriteBufferSize(long writeBufferSize) throws IOException

 

물론, 명시적으로 버퍼를 비울수도 있습니다.

 

개발자가 작성한 코드로 인해서 버퍼가 비워지는 경우는 아래와 같습니다.

 

  • flushCommits 호출 시
  • autoFlush가 true의 경우 put 메서드 호출 시
  • setWriteBufferSize 호출 시

 

아래는 쓰기 버퍼를 사용한 예제 입니다.

 

    public static void main(String[] args) {
        Configuration conf = HBaseConfiguration.create();
        HTable hTable = new HTable(conf, "testtable");
        
        System.out.println("Auto flush: " + hTable.isAutoFlush());

        hTable.setAutoFlush(false);
        
        
        Put put1 = new Put(Bytes.toBytes("row1"));
        put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
        hTable.put(put1);

        Put put2 = new Put(Bytes.toBytes("row2"));
        put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val2"));
        hTable.put(put2);

        Put put3 = new Put(Bytes.toBytes("row3"));
        put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val3"));
        hTable.put(put3);

        Get get = new Get(Bytes.toBytes("row1"));
        Result res1 = hTable.get(get);
        System.out.println("Result: " + res1);
        
        hTable.flushCommits();
        
        Result res2 = hTable.get(get);

        System.out.println("Result: " + res2);
    }

 

위 결과 출력은 아래와 같습니다.

 

Auto flush: true
Result: keyvalues=None
Result: keyvalues={ro1/colfam1/qual1/123412358/Put/vlen=4}

 

위에서 설명한것과 같이 버퍼에 쓰기만 하고 서버로 PRC를 하지 않았기 때문에

첫번째 Get에서는 None이 나온것을 볼 수 있습니다.

 

만약 버퍼에 있는 데이터를 보기 위해서는 아래 메서드를 사용하면 됩니다.

 

ArrayList<Put> getWriteBuffer()

 

하지만 이 방법은 멀티쓰레드 환경에서 조심해야 합니다.

이유로는 List에 접근 시 힙 크기를 확인하지 않고 접근하며, 또 버퍼 비우기가 진행되는 도중 다른 쓰레드가 값을 변경할 수 있기 때문입니다.

 

 

4. Put 리스트

 

클라이언트 API는 단일 put이 아닌 List<put>도 처리 가능하도록 제공합니다.

 

void put(List<Put> puts) throws IOException

 

위 메서드를 사용하면 List<Put>으로 데이터 적재가 가능합니다.

 

다만, List에 있는 모든 Put이 성공하지 않을 수 있습니다.

성공하지않은 Put이 있다면 클라이언트는 IOException을 받게됩니다.

 

하지만, Hbase는 List에 있는 put을 이터레이트돌며 적용하기 때문에 하나가 실패한다고 안에 있는것이 모두 실패하지는 않습니다.

 

실패한 Put은 쓰기버퍼에 남아있게 되고 다음 flush 작업에서 재수행하게 됩니다.

만약 데이터가 잘못되어 실패되는 케이스라면 계속 버퍼에 남게되어 재수행을 반복하게 될 것입니다.

 

이를 방지하기 위해서는 수동을 버퍼를 비워줘야 합니다.

 

아래는 일괄 put을 날린 후 try-catch를 통해 실패가 발생하게 있다면 명시적으로 버퍼를 비우는 예제 코드입니다.

 

List<Put> puts = new ArrayList<>();
Put put1 = new Put(Bytes.toBytes("row1"));
put1.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual1"), Bytes.toBytes("val1"));
puts.add(put1)

Put put2 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("BOGUS"), Bytes.toBytes("qual1"), Bytes.toBytes("val2"));
puts.add(put2)

Put put3 = new Put(Bytes.toBytes("row2"));
put2.add(Bytes.toBytes("colfam1"), Bytes.toBytes("qual2"), Bytes.toBytes("val3"));
puts.add(put3)

Put put4 = new Put(Bytes.toBytes("row2"));
puts.add(put4)
try {
    hTable.put(puts);
} catch (Exception e) {
    hTable.flushCommits();    
}

 

추가로, 리스트 기반의 입력시에는 Hbase 서버에서 입력 연산의 순서가 보장되지 않습니다.

 

 

5. 원자적 확인 후 입력 연산

 

Hbase에서는 원자적 확인 후 입력이라는 특별한 기능을 제공합니다.

 

이 기능은 특정한 조건에 만족하는 경우 put 연산을 수행할 수 있도록합니다.

반환값으로는 boolean 값으로 put 연산이 수행되었는지의 여부를 의미합니다.

 

사용법으로는 아래와 같습니다.

최신 버전에서는 deprecated 되어 있습니다.

 

boolean checkAndPut(byte[] row, byte[] family, byte[] qualifier, byte[] value, Put put) throws IOException 
boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException
boolean checkAndPut(final byte [] row, final byte [] family, final byte [] qualifier, final CompareOp compareOp, final byte [] value, final Put put) throws IOException

 

 

2) Get 메서드

 

Hbase 클라이언트는 데이터를 읽어오는 Get 메서드를 제공합니다.

 

 

1. 단일 Get

 

특정 값을 반환받는데 사용하는 메서드입니다.

 

Result get(Get get) throws IOException

 

put과 유사하게 get 메서드도 전용 Get 클래스를 인자로 받고 있습니다.

 

Get 클래스의 생성자 메서드는 아래와 같습니다.

 

Get(byte[] row)
Get(byte[] row, RowLock rowLock)

 

아래는 한 로우에대해서 읽는 데이터 범위를 줄이기위해 필요한 보조적인 메서드들입니다.

 

Get addFamily(byte[] family)
Get addColumn(byte[] family, byte[] qualifier)
Get setTimeRange(long minStamp, long maxStamp) throws IOException
Get setTimeStamp(long timestamp)
Get setMaxVersions()
Get setMaxVersions(int maxVersions) throws IOException

 

읽어올때는 위에서 소개한 Bytes 헬퍼 클래스를 통해 byte[]을 원하는 데이터 타입으로 변환이 가능합니다.

 

static String toString(byte[] b)
static boolean toBoolean(byte[] b)
static long toLong(byte[] b)
static float toFloat(byte[] b)
static int toInt(byte[] b)

 

 

 

 

반응형

 

 

 

 

 

2. Result 클래스

 

get 메서드의 반환 타입은 Result 클래스입니다.

 

Result 클래스는 Hbase 데이터인 컬럼패밀리, 퀄리파이어, 타임스탬프, 값 등을 모두 가지고 있으며 내부 메서드로 제공하고 있습니다.

 

byte[] getValue(byte[] family, byte[] qualifier) // 특정 셀 값
byte[] value() // 사전 편찬식으로 정렬된 KeyValue 중에 첫번째 값
byte[] getRow() // 로우키
int size() // KeyValue 갯수
boolean isEmpty() // KeyValue 존재 여부
KeyValue[] raw() // KeyValue 접근을 위한 메서드
List<KeyValue> list() // raw에서 반환되는 KeyValue 배열을 단순히 list 형식으로 바꿔서 반환하는 메서드

 

아래는 부가적으로 컬럼단위로 제공하는 메서드입니다.

 

List<KeyValue> getColumn(byte[] family, byte[] qualifier)
KeyValue getColumnLatest(byte[] family, byte[] qualifier)
boolean containsColumn(byte[] family, byte[] qualifier)

 

아래는 Result의 데이터에 접근을 용이하게 제공하는 Map 지향적인 메서드입니다.

 

NavigableMap<byte[], NavigableMap<byte[], NavigableMap<Long, byte[]>>> getMap()
NavigableMap<byte[], NavigableMap<byte[], byte[]>> getNoVersionMap() // 최근 값만을 반환
NavigableMap<byte[], byte[]> getFamilyMap(byte [] family)

 

위와 같은 메서드는 상황에 따라 적절히 선택하여 사용하면 되며,

이미 Result 인스턴스는 서버에서 클라이언트로 데이터를 받았기 때문에 어떤 메서드를 선택하든지 성능에 이슈는 없습니다.

 

 

3. Get 리스트

 

get은 단일 결과가 아닌 멀티개의 결과도 읽을 수 있도록 제공합니다.

 

Result[] get(List<Get> gets) throws IOException

 

put 리스트와 차이점으로는 예외 발생시 입니다.

get의 경우에는 List<Get>의 동일한 크기의 배열을 반환받는것과 아예 예외를 반환받는 둘 중에 한가지로 동작이 됩니다.

 

 

4. Get 관련 기타 메서드

 

단순히 데이터를 읽어오는 메서드말고도 제공하는 메서드에 대해서 알아 보겠습니다.

 

boolean exists(Get get) throws IOException
Result getRowOrBefore(byte[] row, byte[] family)

 

exists 의 경우에는 메서드명 그대로 지정한 로우가 있는지 확인하기 위한 메서드입니다.

 

getRowOrBefore의 경우에는 지정한 로우가 있으면 반환하지만 없으면 바로 그 이전 로우를 반환합니다.

이전 로우도 없다면 null를 반환합니다.

Hbase는 로우키 기준으로 사전편찬식으로 정렬되어 저장되기 때문에 이러한 기능이 가능합니다.

 

 

3) Delete 메서드

 

이번엔 CRUD중 D에 해당하는 delete 메서드에 대해 알아 보겠습니다.

 

 

1. 단일 Delete

void delete(Delete delete) throws IOException

 

put과 get과 마찬가지로 Delete 전용 클래스를 인자로 받습니다.

 

Delete 클래스의 생성자는 아래와 같습니다.

 

Delete(byte[] row)
Delete(byte[] row, long timestamp, RowLock rowLock)

 

추가로 삭제 범위를 줄일때에는 아래 메서드를 활용하면 됩니다.

 

Delete deleteFamily(byte[] family)
Delete deleteFamily(byte[] family, long timestamp)
Delete deleteColumns(byte[] family, byte[] qualifier)
Delete deleteColumns(byte[] family, byte[] qualifier, long timestamp)
Delete deleteColumn(byte[] family, byte[] qualifier) // 최근 버전만삭제 
Delete deleteColumn(byte[] family, byte[] qualifier, long timestamp) // 지정한 타임스탬프와 일치하는것만 삭제
void setTimestamp(long timestamp) // 전체 컬럼패밀리 대상으로 지정한 타임스탬프 이하인 값들을 삭제

 

2. Delete 리스트

 

delete도 위 put, get과 마찬가지로 단일 뿐만이 아니라 List<Delete>도 제공합니다.

 

void delete(List<Delete> deletes) throws IOException

 

Delete의 예외 방식에 대해 알아보겠습니다.

 

delete()는 인수로 전달했던 List<Delete> 에서 실패한 Delete 객체만이 남게됩니다.

결국, 모두 성공했을때는 인자로 받은 List<Delete>는 텅비게 되어집니다.

 

 

3. 원자적 확인 후 삭제 연산

 

delete에서도 put과 동일하게 원자적으로 확인 후 삭제 연산을 할 수 있도록 제공하고 있습니다.

 

boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final byte[] value, final Delete delete) throws IOException
boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOp compareOp, final byte[] value, final Delete delete) throws IOException
boolean checkAndDelete(final byte[] row, final byte[] family, final byte[] qualifier, final CompareOperator op, final byte[] value, final Delete delete) throws IOException

 

value 파라미터에 null을 넣어서 수행하게 되면 해당 컬럼 값이 존재하는지 여부를 검사합니다.

 

 

4. 일괄처리 연산

지금까지는 단일 또는 리스트 기반의 연산을 알아봤습니다.

 

이번에는 여러개의 로우에 대해서 다양한 연산을 일괄처리하는 API에 대해서 알아보겠습니다.

 

클라이언트는 이 일괄처리를 위해 Put, Get, Delete의 조상 클래스인 Row 클래스를 제공합니다.

 

메서드는 아래와 같습니다.

 

void batch(final List<? extends Row> actions, final Object[] results)

 

여러 Row 하위 클래스들을 인자로 받아 처리 후 2번째 인자로 받은 Objects 배열에 결과를 담습니다.

 

또 한가지 메서드가 있는데 명세는 아래와 같습니다.

 

Object[] batch(final List<? extends Row> actions) throws IOException, InterruptedException

 

이 메서드는 예외가 발생하면 throws가 되므로 결과값 배열에는 아무것도 담기지 않게됩니다.

 

위 두 batch 메서드 수행시에는 Put 인스턴스가 쓰기 버퍼에 저장되지 않습니다.

 

아래는 예제입니다.

 

private final static byte[] ROW1 = Bytes.toBytes("row1");
private final static byte[] ROW2 = Bytes.toBytes("row2");
private final static byte[] COLFAM1 = Bytes.toBytes("colfam1");
private final static byte[] COLFAM2 = Bytes.toBytes("colfam2");
private final static byte[] QUAL1 = Bytes.toBytes("qual1");
private final static byte[] QUAL2 = Bytes.toBytes("qual2");

public static void main(String[] args) {
    Configuration conf = HBaseConfiguration.create();
    HTable hTable = new HTable(conf, "testtable");
    
    List<Row> batch = new ArrayList<>();
    
    Put put = new Put(ROW2);
    put.addColumn(COLFAM2, QUAL1,  Bytes.toBytes("val5"));
    batch.add(put);
    
    Get get1 = new Get(ROW1);
    get1.addColumn(COLFAM1, QUAL1);
    batch.add(get1);
    
    Delete delete = new Delete(ROW1);
    delete.deleteColumns(COLFAM1, QUAL2);
    batch.add(delete);
    Get get2 = new Get(ROW2);
    get2.addFamily(Bytes.toBytes("BOGUS"));
    batch.add(get2);
    
    Object[] results = new Object[batch.size()];
    
    try {
        hTable.batch(batch, results);
    } catch (Exception e) {
        System.out.println("ERROR : " + e);
    }
    
    Arrays.stream(results).forEach(item -> System.out.println("Result : " + item));
}

 

5. 로우 락

Hbase의 경우 각 로우에 대해 원자성을 보장하여 순차적으로 처리합니다.

그렇기 때문에 위에서 알아본 put, delete, checkAndPut 등도 모두 원자성이 보장되면서 수행이 되어집니다.

 

한 예로 Put 인스턴스를 생성시에 RowLock을 부여하지 않는다면 서버측에서는 해당 로우에 대해서 

메서드가 수행되는 동안에만 유지되는 락을 생성하여 적용하게 됩니다.

 

만약, 한 로우에 대해서 클라이언트측이 커스텀하게 락을 획득 및 해제를 하고 싶은 경우에는 아래 메서드를 이용하면 됩니다.

단, 잘못하면 다른 연산 메서드에 영향이 갈 수 있으니 신중히 사용해야 합니다.

 

RowLock lockRow(byte[] row) throws IOException
void unlockRow(RowLock rl) throws IOException

 

락은 다른 수행에 영향이 끼칠 수 있으니 무기한으로 설정되어 있지는 않습니다.

hbase-site.xml에 hbase.resionserver.lease.period 에 값만큼이 락의 유효 기간입니다.

디폴트 1분입니다.

 

6. 스캔

Hbase는 범위 탐색인 스캔 기능을 제공합니다.

스캔은 Hbase가 제공하는 순차적이고 정렬된 저장구조를 활용합니다.

 

1) 소개

 

Hbase 클라이언트는 Scan 이라는 별도의 클래스를 제공하고 있습니다.

 

아래는 사용 메서드입니다.

 

ResultScanner getScanner(Scan scan)
ResultScanner getScanner(byte [] family)
ResultScanner getScanner(byte [] family, byte [] qualifier)

 

Scan 인자를 주면 내부적으로 스캔한 결과를 담은 ResultScanner를 반환합니다.

2,3번째 메서드는 내부적으로 Scan을 만들어서 동작 후 결과를 반환합니다.

 

아래는 Scan의 생성자입니다.

 

Scan()
Scan(byte[] startRow, Filter filter)
Scan(byte[] startRow)
Scan(byte[] startRow, byte[] stopRow)

 

생성자에서 알 수 있듯이 Scan은 꼭 정확한 로우를 몰라도 탐색이 가능합니다.

startRow만 주게된다면 startRow보다 같거나 큰 로우들부터 탐색을하게 됩니다.

stopRow는 탐색이 해당 stopRow보다 크거나 같은 로우를 만나면 끝나게 됩니다.

 

Filter는 말그대로 탐색 중 사용자가 지정한 데이터만을 얻기위해서 있는 인자입니다.

 

인자가 없는 기본 생성자로도 수행은 되며, 이경우에는 모든 컬럼패밀리 및 그에 속한 컬럼을 포함한 전체 테이블을 스캔합니다.

 

아래는 위 CRUD의 메서드들과 같이 탐색의 범위를 줄이기 위한 메서드입니다.

 

Scan addFamily(byte [] family)
Scan addColumn(byte [] family, byte [] qualifier)
Scan setTimeRange(long minStamp, long maxStamp) throws IOException
Scan setTimeStamp(long timestamp)
Scan setMaxVersions()
Scan setMaxVersions(int maxVersions)

 

그 외에도 Scan 클래스의 경우에는 getter/setter가 있어 언제든지 활용이 가능합니다.

 

 

2) ResultScanner 클래스

 

스캔의 경우에는 한번의 RPC로 로우들을 클라이언트한테 반환하지 않습니다.

이유는, 방대한 로우가 탐색될 수도 있어 한번의 RPC로는 오버헤드가 발생할 수 있기때문입니다.

 

때문에, ResultScanner는 Get과 비슷한 연산으로 변환 후 Row에 대한 Result 인스턴스를 이터레이트할 수 있도록 감싼 형태입니다.

 

아래는 ResultScanner의 메서드입니다.

 

Result next() throws IOException // 한번에 한 Result 반환
Result[] next(int nbRows) throws IOException // nbRows 만큼의 Result 반환
void close()

 

스캐너의 경우 서버 측 리소스를 꽤 사용하기 때문에 힙 공간을 낭비하게 됩니다.

때문에, 꼭 작업이 끝난다음에는 close를 명시적으로 호출하여 자원을 반환하게 해야합니다.

 

 

아래는 스캐너 사용 예제입니다.

 

public static void main(String[] args) throws IOException {
    Configuration conf = HBaseConfiguration.create();
    HTable hTable = new HTable(conf, "testtable");

    Scan scan1 = new Scan();
    ResultScanner scanner1 = hTable.getScanner(scan1);
    for(Result res: scanner1) {
        System.out.println(res);
    }
    scanner1.close();

    Scan scan2 = new Scan();
    scan2.addFamily(Bytes.toBytes("colfam1"));
    ResultScanner scanner2 = hTable.getScanner(scan2);
    for(Result res: scanner2) {
        System.out.println(res);
    }
    scanner2.close();

    Scan scan3 = new Scan();
    scan3.addColumn(Bytes.toBytes("colfam1"), Bytes.toBytes("col-5"))
            .addColumn(Bytes.toBytes("colfam2"), Bytes.toBytes("col-33"))
            .setStartRow(Bytes.toBytes("row-10"))
            .setStopRow(Bytes.toBytes("row-20"));
    ResultScanner scanner3 = hTable.getScanner(scan3);
    for(Result res: scanner3) {
        System.out.println(res);
    }
    scanner3.close();
}

 

3) 캐싱 대 일괄처리

 

스캔의 경우 next메서드가 호출될때 마다 RPC가 발생합니다.

이는, 데이터 크기가 작은 경우에는 오히려 성능상으로 안좋을 수 있습니다.

 

때문에, Hbase에서는 스캐너 캐싱이라는 기능을 제공합니다.

 

이 캐싱 기능은 아래와 같이 2개의 다른 수중에서 활성화할 수 있습니다.

 

  • 테이블 단위의 캐싱
  • 스캔 단위의 캐싱

 

1. 테이블 단위 캐싱

 

테이블 단위의 캐싱을 사용할 때는 아래 메서드를 사용하면 됩니다.

 

void setScannerCaching(int scannerCaching)
int getScannerCaching()

 

아래는 스캔 단위의 캐싱 메서드입니다.

 

void setCaching(int caching)
int getCaching()

 

이러한 캐싱의 효과로는 RPC가 반환하는 로우 갯수를 사용자가 제어할 수 있다는 점입니다.

무조건 캐싱양을 늘리기보다는 클라이언트와 서버의 스펙을 고려하여 최적점을 찾아야 합니다.
무제한으로 올리게 된다면 최악에는 OutOfMemoryException이 발생하게 됩니다.

 

하지만 예기치 못하게 하나의 로우가 매우 커서 메모리 이슈가 발생할 수 도 있습니다.

 

Hbase에서는 이를 위해 일괄처리 기능을 제공합니다.

 

void setBatch(int batch)
int getBatch()

 

일괄처리는 캐싱과 다르게 로우단위가 아닌 컬럼단위로 동작합니다.

한마디로 next 메서드 호출 시 반환되는 컬럼갯수를 제어합니다.

 

예를들어, setBatch(5)로 세팅을 하게된다면 Result 인스턴스마다 다섯개의 컬럼이 반환되어집니다.

 

만약 로우의 컬럼이 17개이고 일괄처리를 5로 설정했을때는 하나의 로우를 조각내어
5->5->5->2 개로 반환받게 됩니다.

 

처음에는 이 2개를 고려하지 않아도 되지만,

점차 서비스가 커지면서 최적화를 해야한다면 위의 스캐너 캐싱과 일괄처리를 적절히 조합하여 사용해야합니다.

 

 

7. 마무리

 

이번 포스팅에서는 클라이언트 API : 기본 기능에 대해 진행하였습니다.

 

다음 포스팅에서는 챕터 4장인 클라이언트 API : 고급 기능에 대해 진행하겠습니다.

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(2) 설치  (0) 2020.04.08
(1) 소개  (0) 2020.04.07
반응형

1. 서론

 

이번 포스팅에서는 Hadoop 카테고리에 있는 Hadoop 설치에서 Hbase를 추가하는 과정을 진행하겠습니다.

 

2. 설치

1) Hbase 서비스 추가

 

먼저 설치되어 있는 클라우데라 매니저 UI에 접속하여 아래와 같이 서비스 추가를 클릭합니다.

 

 

클릭하게 되면 아래와 같이 추가할 수 있는 서비스 목록 화면으로 넘어가게 됩니다.

Hbase를 선택해 주세요.

 

 

2) 서버 선택

 

Hbase를 선택하고 완료 버튼을 누르게 되면 마스터, 리전, 쓰리프트, Rest 용도의 서비스를 어느 서버에 추가할 지 선택하는 화면이 나옵니다.

 

 

저의 경우, 포스팅을 위해 사전에 미리 설치하여 위와 같이 빨간색으로 뜨고 있습니다.

저는 총 1~4번 서버 중에  Master는 1번 리전서버는 2~4번 서버를 선택하였습니다.

Rest 서버와 쓰리프트 서버는 별도로 등록하진 않았습니다. 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3) Hbase 설치 완료

 

이제 계속 버튼을 눌러 설치를 진행하게되면 Hbase가 설치가 됩니다.

 

이런 편리함 때문에, 클라우데라와 같은 매니저 역할의 상용 서비스를 사용하는것이 편합니다.

 

 

4) 기타 설정

 

설치는 완료 되었지만 Hbase에는 느낌표와 빨간 경고표시가 많이 보이게 될 것입니다.

 

이 부분은 Hadoop 설치 부분에서도 언급했었던, heap 사이즈를 root로 잡고 있기 때문입니다.

 

root의 디스크 공간이 충분하다면 안보이실 수 있습니다.

 

heap의 경우에는 아래와 같이 경로를 변경해 줍니다.

저의 경우에는 /home1/irteam/tmp로 변경하였습니다.

 

 

추가로 hbase관련 log를 적재하는 디렉터리도 root가 부족하다면 변경해야 합니다.

이것은 직접 서버에 들어가 아래와 같이 수정해주시면 됩니다.

 

sudo mkdir -p /home1/irteam/var/log/hbase
sudo mv /var/log/hbase/* /home1/irteam/var/log/hbase
sudo rm -rf /var/log/hbase
sudo ln -s /home1/irteam/var/log/hbase /var/log/hbase
sudo chmod -R 777 /home1/irteam

 

저의 경우에는 기본 경로인 /var/log/hbase에는 디스크가 충분하지 않아,

충분한 /home1/irteam/var/log/hbase 로 데이터를 옮긴 후 심볼릭 링크를 걸었습니다.

 

 

5) 재시작

 

이젠 heap 경로의 설정이 변경된것을 반영하기 위해 Hbase를 아래와 같이 재시작해줍니다.

 

3. 설치 확인

설치가 제대로 되었는지 확인하기 위해 서버에 접속하여 hbase 쉘에 접속 테스트를 진행해 봅니다.

 

아래와 같이 hbase shell 명령어를 통해 접속이 가능합니다.

 

간단히 클러스터의 상태를 확인하는 status 명령어를 수행하여 정상적으로 돌고 있는지 확인합니다.

 

위에서 설명한것과 같이 1개의 마스터와 3개의 리전서버가 active 한것을 볼 수 있습니다.

 

4. 마무리

 

이번 포스팅에서는 Hbase 설치를 진행해봤습니다.

 

다음 포스팅에서는 챕터 3장인 클라이언트 API:기본 기능에 대해 진행하도록 하겠습니다.

 

반응형

'BigData > Hbase' 카테고리의 다른 글

(5) 클라이언트 API : 관리 기능  (0) 2020.06.02
(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(1) 소개  (0) 2020.04.07

+ Recent posts