반응형

1. 서론

저번 포스팅에서는 impala- mybatis 사용에 대해서 알아보았습니다.

 

그럼, impala는 SQL 지원이 되고 relation이 있으니 ORM 기술인 hibernate도 사용할 수 있지 않나? 라고 생각할 수 있습니다.

 

이 질문은 반은 맞고, 반은 틀리다고 할 수 있습니다.

이유는, hibernate 이념과 impala 이념이 다르기 때문입니다.

 

hibernate는 기본적으로 트랜잭션이 가능한 저장소 를 지원하며, 데이터의 CUD는 무조건 트랜잭션을 통해 이루어집니다.

 

하지만, impala는 하나의 데이터 저장소가 아닌 hdfs, kudu, hbase와 같이 여러 저장소를 지원하는 SQL 인터페이스이며,

가장 중요한 트랜잭션이 없습니다. 게다가 hdfs의 경우에는 delete, update는 되지도 않습니다.

 

하둡의 hbase, kudu 등등은 저장소라고 볼 수 있지만, 다른 dbms들과 같이 트랜잭션개념은 없습니다.
이런 저장소 프로젝트들은 자체 클라이언트를 제공하고 있으며 실패에 대한 처리는 개발자의 몫으로 넘깁니다.

물론, atomic 특성을 제공하는 저장소도 있지만 이것은 트랜잭션이랑은 별개의 내용입니다.

 

하지만, 조회의 경우에는 말이 달라집니다. 일반적으로 조회는 트랜잭션이 필요없습니다.

게다가, 제가 일하는 조직에서는 DB의 실시간 백업용으로 ROS용 Kudu를 사용하고 있습니다.

 

저장소 자체가 ROS이기에 어플리케이션 단에서는 조회만을 하게 되고, hibernate의 엔티티 관계에 대한 이점 을 누릴 수 있습니다.

 

2. 문제점

하지만, Dialect 관련 문제가 하나 있었습니다.

 

hibernate는 기본적으로 Dialect(= 방언)를 설정하지 않으면,

jdbc를 통해 알맞은 데이터베이스가 무엇이고 데이터베이스에 맞는 Dialect을 찾아 세팅해줍니다.

 

jdbc 로부터 데이터베이스를 찾아 Dialect을 세팅하는 과정은 아래와 같습니다.

 

1) DialectResolverInitiator

 

hibernate에서는 Dialect 결정을 위한 서비스 클래스로 DialectResolverInitiator 클래스의 initiateService 메서드를 호출합니다.

 

아래는 DialectResolverInitiator 클래스의 initiateService 메서드입니다.

 

@Override
public DialectResolver initiateService(Map configurationValues, ServiceRegistryImplementor registry) {
	final DialectResolverSet resolver = new DialectResolverSet();

	applyCustomerResolvers( resolver, registry, configurationValues );
	resolver.addResolver( new StandardDialectResolver() );
    
	return resolver;
}

 

코드에서 보이는것과 같이 customResolver 제외하고, 기본 Resolver로 StandardDialectResolver 를 사용합니다.

 

 

2) StandardDialectResolver

 

아래는 StandardDialectResolver 클래스 코드입니다.

hibernate-core에 있는 Database enum 클래스를 iterate돌며 jdbc에 알맞은 dialect를 찾는 로직입니다.

public final class StandardDialectResolver implements DialectResolver {

	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {

		for ( Database database : Database.values() ) {
			Dialect dialect = database.resolveDialect( info );
			if ( dialect != null ) {
				return dialect;
			}
		}

		return null;
	}
}

 

3) DataBase

 

아래는 DataBase enum 클래스의 일부 코드입니다.

코드에서 보이듯이 각 Database에 대해서 정의가 되어 있으며, 인자로 받은 DialectResolutionInfo info 를 통해 맞는 Database인지 확인하는 로직이 있습니다.

 

DialectResolutionInfo 에서 사용하는 getDatabaseName, getDatabaseMajorVersion 등의 메서드는 jdbc의 DatabaseMetaData interface 구현체 메서드를 사용하도록 되어있습니다.

 

ORACLE {
	@Override
	public Class<? extends Dialect> latestDialect() {return Oracle12cDialect.class;}

	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		final String databaseName = info.getDatabaseName();

		if ( "Oracle".equals( databaseName ) ) {
			final int majorVersion = info.getDatabaseMajorVersion();

			switch ( majorVersion ) {
				case 12:
					return new Oracle12cDialect();
				case 11:
					// fall through
				case 10:
					return new Oracle10gDialect();
				case 9:
					return new Oracle9iDialect();
				case 8:
					return new Oracle8iDialect();
				default:
					return latestDialectInstance( this );
			}
		}

		return null;
	}
},
POINTBASE {
	@Override
	public Class<? extends Dialect> latestDialect() {
		return PointbaseDialect.class;
	}
	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		return null;
	}
},
POSTGRESQL {
	@Override
	public Class<? extends Dialect> latestDialect() {
		return PostgreSQL10Dialect.class;
	}
	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		final String databaseName = info.getDatabaseName();
		if ( "PostgreSQL".equals( databaseName ) ) {
			final int majorVersion = info.getDatabaseMajorVersion();
			final int minorVersion = info.getDatabaseMinorVersion();
			if ( majorVersion < 8 ) {
				return new PostgreSQL81Dialect();
			}
			if ( majorVersion == 8 ) {
				return minorVersion >= 2 ? new PostgreSQL82Dialect() : new PostgreSQL81Dialect();
			}
			if ( majorVersion == 9 ) {
				if ( minorVersion < 2 ) {
					return new PostgreSQL9Dialect();
				}
				else if ( minorVersion < 4 ) {
					return new PostgreSQL92Dialect();
				}
				else if ( minorVersion < 5 ) {
					return new PostgreSQL94Dialect();
				}
				else {
					return new PostgreSQL95Dialect();
				}
			}
			return latestDialectInstance( this );
		}
		return null;
	}
},

 


하지만 서론에서 말씀드린것과 같이 hibernate 이념과 impala는 맞지 않아,

Database enum 클래스에는 IMPALA 가 없을 뿐더러 ImpalaDialect도 존재하지 않습니다.

위에 보여드린 3개 클래스는 모두 hibernate-core lib에 있습니다.

 

 

반응형

3. 해결법

1) Hibernate PR

 

위 이슈와 관련해서, hibernate 쪽에 PR을 올려둔 상태입니다.

하지만, hibernate의 이념과 맞지 않고, 너무 특정케이스를 위해서이기 때문에 PR에 대한 approve가 될지는 모르겠습니다.....ㅠ

소스 변경이 아닌 추가여서, 장애 포인트는 없지만 이념과 다르다는 피드백을 받을거 같네요....ㅎㅎ

 

2) Custom resolver, database, dialect 등록

 

그래도 사용은 할 수 있습니다.

바로 위에서 살펴본 DialectResolverInitiator 클래스에 customResolver를 등록하는 코드가 있기 때문입니다!!

 

혹시나, 저와 같이 필요한 경우가 있다면 아래와 같이 커스텀한 resolver, database, dialect를 사용하여

조회용도의 impala-hibernate 조합을 사용할 수 있습니다.

 

1. Custom Resolver

 

커스텀한 Resolver를 만들어줍니다.

 

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class CustomDialectResolver implements DialectResolver {

    @Override
    public Dialect resolveDialect(DialectResolutionInfo info) {
        for ( ImpalaDatabase database : ImpalaDatabase.values() ) {
            Dialect dialect = database.resolveDialect( info );
            if ( dialect != null ) {
                return dialect;
            }
        }
        return null;
    }
}

 

2. Custom Database 

 

Resolver에서 사용하는 Database enum 클래스를 만들어 줍니다.

코드에서 알 수 있듯이, ImpalaJdbc는 getDatabaseName 메서드로 Impala 를 반환합니다.

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public enum ImpalaDatabase {

    IMPALA {
        @Override
        public Class<? extends Dialect> latestDialect() {
            return ImpalaDialect.class;
        }

        @Override
        public Dialect resolveDialect(DialectResolutionInfo info) {
            final String databaseName = info.getDatabaseName();
            if ("Impala".equals(databaseName)) {
                return latestDialectInstance(this);
            }
            return null;
        }
    };

    public abstract Class<? extends Dialect> latestDialect();

    public abstract Dialect resolveDialect(DialectResolutionInfo info);

    private static Dialect latestDialectInstance(ImpalaDatabase database) {
        try {
            return database.latestDialect().newInstance();
        }
        catch (InstantiationException | IllegalAccessException e) {
            throw new HibernateException( e );
        }
    }
}

 

3. Custom Dialect

 

Database에서 반환할 Dialect를 만듭니다.

해당 Dialect은 Cloudera Document - SQL Reference를 참고하여 만들었습니다.

모든 function, dml, ddl에 대해서 확인하는데에 한계가 있어 틀린부분이 있을 수 있지만
애초에 Custom Dialect이기 때문에 해당 부분은 fix하여 사용하면 됩니다.

 

import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.sql.Types;
import org.hibernate.MappingException;
import org.hibernate.cfg.Environment;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.function.StandardSQLFunction;
import org.hibernate.dialect.pagination.AbstractLimitHandler;
import org.hibernate.dialect.pagination.LimitHandler;
import org.hibernate.dialect.pagination.LimitHelper;
import org.hibernate.engine.jdbc.env.spi.NameQualifierSupport;
import org.hibernate.engine.spi.RowSelection;
import org.hibernate.tool.schema.extract.internal.SequenceInformationExtractorHSQLDBDatabaseImpl;
import org.hibernate.tool.schema.extract.spi.SequenceInformationExtractor;
import org.hibernate.type.StandardBasicTypes;

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class ImpalaDialect extends Dialect {

    private final class ImpalaLimitHandler extends AbstractLimitHandler {
        @Override
        public String processSql(String sql, RowSelection selection) {
            final boolean hasOffset = LimitHelper.hasFirstRow( selection );
            return sql + (hasOffset ? " limit ? offset ?" : " limit ?");
        }

        @Override
        public boolean supportsLimit() {
            return true;
        }

        @Override
        public boolean bindLimitParametersFirst() {
            return true;
        }
    }

    private final LimitHandler limitHandler;


    /**
     * Constructs a ImpalaDialect
     */
    public ImpalaDialect() {
        super();

        registerColumnType( Types.BIGINT, "bigint" );
        registerColumnType( Types.BOOLEAN, "boolean" );
        registerColumnType( Types.CHAR, "char($l)" );
        registerColumnType( Types.DECIMAL, "decimal($p,$s)" );
        registerColumnType( Types.DOUBLE, "double" );
        registerColumnType( Types.FLOAT, "float" );
        registerColumnType( Types.INTEGER, "int" );
        registerColumnType( Types.SMALLINT, "smallint" );
        registerColumnType( Types.VARCHAR, "string" );
        registerColumnType( Types.BLOB, "string" );
        registerColumnType( Types.CLOB, "string" );
        registerColumnType( Types.TINYINT, "tinyint" );
        registerColumnType( Types.TIMESTAMP, "timestamp" );

        // Impala Mathematical Functions
        registerFunction( "abs", new StandardSQLFunction("abs") );
        registerFunction( "acos", new StandardSQLFunction( "acos", StandardBasicTypes.DOUBLE ) );
        registerFunction( "asin", new StandardSQLFunction( "asin", StandardBasicTypes.DOUBLE ) );
        registerFunction( "atan", new StandardSQLFunction("atan", StandardBasicTypes.DOUBLE) );
        registerFunction( "atan2", new StandardSQLFunction("atan2", StandardBasicTypes.DOUBLE) );
        registerFunction( "bin", new StandardSQLFunction( "bin", StandardBasicTypes.STRING ) );
        registerFunction( "ceil", new StandardSQLFunction("ceil") );
        registerFunction( "dceil", new StandardSQLFunction("dceil") );
        registerFunction( "ceiling", new StandardSQLFunction("ceiling") );
        registerFunction( "conv", new StandardSQLFunction("conv") );
        registerFunction( "cos", new StandardSQLFunction( "cos", StandardBasicTypes.DOUBLE ) );
        registerFunction( "cot", new StandardSQLFunction( "cot", StandardBasicTypes.DOUBLE ) );
        registerFunction( "cosh", new StandardSQLFunction( "cosh", StandardBasicTypes.DOUBLE ) );
        registerFunction( "degrees", new StandardSQLFunction( "degrees", StandardBasicTypes.DOUBLE ) );
        registerFunction( "e", new StandardSQLFunction( "e", StandardBasicTypes.DOUBLE ) );
        registerFunction( "exp", new StandardSQLFunction( "exp", StandardBasicTypes.DOUBLE ) );
        registerFunction( "factorial", new StandardSQLFunction( "factorial", StandardBasicTypes.INTEGER) );
        registerFunction( "floor", new StandardSQLFunction( "floor", StandardBasicTypes.INTEGER ) );
        registerFunction( "dfloor", new StandardSQLFunction( "dfloor", StandardBasicTypes.INTEGER ) );
        registerFunction( "fmod", new StandardSQLFunction("fmod") );
        registerFunction( "fnv_hash", new StandardSQLFunction( "fnv_hash", StandardBasicTypes.INTEGER ) );
        registerFunction( "greatest", new StandardSQLFunction("greatest") );
        registerFunction( "hex", new StandardSQLFunction( "hex", StandardBasicTypes.STRING ) );
        registerFunction( "is_inf", new StandardSQLFunction( "is_inf", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "is_nan", new StandardSQLFunction( "is_nan", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "least", new StandardSQLFunction("least") );
        registerFunction( "ln", new StandardSQLFunction( "ln", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log", new StandardSQLFunction( "log", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log10", new StandardSQLFunction( "log10", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log2", new StandardSQLFunction( "log2", StandardBasicTypes.DOUBLE ) );
        registerFunction( "max_int", new StandardSQLFunction( "max_int", StandardBasicTypes.INTEGER ) );
        registerFunction( "max_bigint", new StandardSQLFunction( "max_bigint", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "max_smallint", new StandardSQLFunction( "max_smallint", StandardBasicTypes.INTEGER ) );
        registerFunction( "max_tinyint", new StandardSQLFunction( "max_tinyint", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_int", new StandardSQLFunction( "min_int", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_bigint", new StandardSQLFunction( "min_bigint", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "min_smallint", new StandardSQLFunction( "min_smallint", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_tinyint", new StandardSQLFunction( "min_tinyint", StandardBasicTypes.INTEGER ) );
        registerFunction( "mod", new StandardSQLFunction( "mod" ) );
        registerFunction( "murmur_hash", new StandardSQLFunction( "murmur_hash", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "negative", new StandardSQLFunction( "negative" ) );
        registerFunction( "pi", new StandardSQLFunction( "pi", StandardBasicTypes.DOUBLE ) );
        registerFunction( "pmod", new StandardSQLFunction( "pmod" ) );
        registerFunction( "positive", new StandardSQLFunction( "positive" ) );
        registerFunction( "pow", new StandardSQLFunction( "pow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "power", new StandardSQLFunction( "power", StandardBasicTypes.DOUBLE ) );
        registerFunction( "dpow", new StandardSQLFunction( "dpow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "fpow", new StandardSQLFunction( "fpow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "precision", new StandardSQLFunction( "precision", StandardBasicTypes.INTEGER ) );
        registerFunction( "quotient", new StandardSQLFunction( "quotient", StandardBasicTypes.INTEGER ) );
        registerFunction( "radians", new StandardSQLFunction( "radians", StandardBasicTypes.DOUBLE ) );
        registerFunction( "rand", new StandardSQLFunction( "rand", StandardBasicTypes.DOUBLE ) );
        registerFunction( "random", new StandardSQLFunction( "random", StandardBasicTypes.DOUBLE ) );
        registerFunction( "round", new StandardSQLFunction( "round" ) );
        registerFunction( "dround", new StandardSQLFunction( "dround" ) );
        registerFunction( "scale", new StandardSQLFunction( "scale", StandardBasicTypes.INTEGER ) );
        registerFunction( "sign", new StandardSQLFunction( "sign", StandardBasicTypes.INTEGER ) );
        registerFunction( "sin", new StandardSQLFunction( "sin", StandardBasicTypes.DOUBLE ) );
        registerFunction( "sinh", new StandardSQLFunction( "sinh", StandardBasicTypes.DOUBLE ) );
        registerFunction( "sqrt", new StandardSQLFunction( "sqrt", StandardBasicTypes.DOUBLE ) );
        registerFunction( "tan", new StandardSQLFunction( "tan", StandardBasicTypes.DOUBLE ) );
        registerFunction( "tanh", new StandardSQLFunction("tanh", StandardBasicTypes.DOUBLE) );
        registerFunction( "truncate", new StandardSQLFunction( "truncate" ) );
        registerFunction( "dtrunc", new StandardSQLFunction( "dtrunc" ) );
        registerFunction( "trunc", new StandardSQLFunction( "trunc" ) );
        registerFunction( "unhex", new StandardSQLFunction( "unhex", StandardBasicTypes.STRING ) );
        registerFunction( "width_bucket", new StandardSQLFunction( "width_bucket" ) );

        // Impala Bit Functions
        registerFunction( "bitand", new StandardSQLFunction( "bitand" ) );
        registerFunction( "bitor", new StandardSQLFunction( "bitor" ) );
        registerFunction( "bitnot", new StandardSQLFunction( "bitnot" ) );
        registerFunction( "bitxor", new StandardSQLFunction( "bitxor" ) );
        registerFunction( "countset", new StandardSQLFunction( "countset" ) );
        registerFunction( "getbit", new StandardSQLFunction( "getbit" ) );
        registerFunction( "rotateleft", new StandardSQLFunction( "rotateleft" ) );
        registerFunction( "rotateright", new StandardSQLFunction( "rotateright" ) );
        registerFunction( "setbit", new StandardSQLFunction( "setbit" ) );
        registerFunction( "shiftleft", new StandardSQLFunction( "shiftleft" ) );
        registerFunction( "shiftright", new StandardSQLFunction( "shiftright" ) );

        // Impala Date and Time Functions
        registerFunction( "add_months", new StandardSQLFunction( "add_months", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "adddate", new StandardSQLFunction( "adddate", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "current_timestamp", new StandardSQLFunction( "current_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_add", new StandardSQLFunction( "date_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_part", new StandardSQLFunction( "date_part", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_sub", new StandardSQLFunction( "date_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_trunc", new StandardSQLFunction( "date_trunc", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "datediff", new StandardSQLFunction( "datediff", StandardBasicTypes.INTEGER ) );
        registerFunction( "day", new StandardSQLFunction( "day", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayname", new StandardSQLFunction( "dayname", StandardBasicTypes.STRING ) );
        registerFunction( "dayofweek", new StandardSQLFunction( "dayofweek", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayofmonth", new StandardSQLFunction( "dayofmonth", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayofyear", new StandardSQLFunction( "dayofyear", StandardBasicTypes.INTEGER ) );
        registerFunction( "days_add", new StandardSQLFunction( "days_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "days_sub", new StandardSQLFunction( "days_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "extract", new StandardSQLFunction( "extract", StandardBasicTypes.INTEGER ) );
        registerFunction( "from_timestamp", new StandardSQLFunction( "from_timestamp", StandardBasicTypes.STRING ) );
        registerFunction( "from_unixtime", new StandardSQLFunction( "from_unixtime", StandardBasicTypes.STRING ) );
        registerFunction( "from_utc_timestamp", new StandardSQLFunction( "from_utc_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "hour", new StandardSQLFunction( "hour", StandardBasicTypes.INTEGER ) );
        registerFunction( "hours_add", new StandardSQLFunction( "hours_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "hours_sub", new StandardSQLFunction( "hours_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "int_months_between", new StandardSQLFunction( "int_months_between", StandardBasicTypes.INTEGER ) );
        registerFunction( "microseconds_add", new StandardSQLFunction( "microseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "microseconds_sub", new StandardSQLFunction( "microseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "millisecond", new StandardSQLFunction( "millisecond", StandardBasicTypes.INTEGER ) );
        registerFunction( "milliseconds_add", new StandardSQLFunction( "milliseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "milliseconds_sub", new StandardSQLFunction( "milliseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "minute", new StandardSQLFunction( "minute", StandardBasicTypes.INTEGER ) );
        registerFunction( "minutes_add", new StandardSQLFunction( "minutes_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "minutes_sub", new StandardSQLFunction( "minutes_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "month", new StandardSQLFunction( "month", StandardBasicTypes.INTEGER ) );
        registerFunction( "monthname", new StandardSQLFunction( "monthname", StandardBasicTypes.STRING ) );
        registerFunction( "months_add", new StandardSQLFunction( "months_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "months_between", new StandardSQLFunction( "months_between", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "months_sub", new StandardSQLFunction( "months_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "nanoseconds_add", new StandardSQLFunction( "nanoseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "nanoseconds_sub", new StandardSQLFunction( "nanoseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "next_day", new StandardSQLFunction( "next_day", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "now", new StandardSQLFunction( "now", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "quarter", new StandardSQLFunction( "quarter", StandardBasicTypes.INTEGER ) );
        registerFunction( "second", new StandardSQLFunction( "second", StandardBasicTypes.INTEGER ) );
        registerFunction( "seconds_add", new StandardSQLFunction( "seconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "seconds_sub", new StandardSQLFunction( "seconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "subdate", new StandardSQLFunction( "subdate", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "timeofday", new StandardSQLFunction( "timeofday", StandardBasicTypes.STRING ) );
        registerFunction( "timestamp_cmp", new StandardSQLFunction( "timestamp_cmp", StandardBasicTypes.INTEGER ) );
        registerFunction( "to_date", new StandardSQLFunction( "to_date", StandardBasicTypes.STRING ) );
        registerFunction( "to_timestamp", new StandardSQLFunction( "to_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "to_utc_timestamp", new StandardSQLFunction( "to_utc_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "unix_timestamp", new StandardSQLFunction( "unix_timestamp", StandardBasicTypes.INTEGER ) );
        registerFunction( "utc_timestamp", new StandardSQLFunction( "utc_timestamp", StandardBasicTypes.INTEGER ) );
        registerFunction( "weekofyear", new StandardSQLFunction( "weekofyear", StandardBasicTypes.INTEGER ) );
        registerFunction( "weeks_add", new StandardSQLFunction( "weeks_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "weeks_sub", new StandardSQLFunction( "weeks_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "year", new StandardSQLFunction( "year", StandardBasicTypes.INTEGER ) );
        registerFunction( "years_add", new StandardSQLFunction( "years_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "years_sub", new StandardSQLFunction( "years_sub", StandardBasicTypes.TIMESTAMP ) );

        // Impala Conditional Functions
        registerFunction( "coalesce", new StandardSQLFunction( "coalesce" ) );
        registerFunction( "decode", new StandardSQLFunction( "decode" ) );
        registerFunction( "if", new StandardSQLFunction( "if" ) );
        registerFunction( "ifnull", new StandardSQLFunction( "ifnull" ) );
        registerFunction( "isfalse", new StandardSQLFunction( "isfalse", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "nullvalue", new StandardSQLFunction( "nullvalue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "nonnullvalue", new StandardSQLFunction( "nonnullvalue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "istrue", new StandardSQLFunction( "istrue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "isnotfalse", new StandardSQLFunction( "isnotfalse", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "isnottrue", new StandardSQLFunction( "isnottrue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "zeroifnull", new StandardSQLFunction( "zeroifnull" ) );
        registerFunction( "nvl2", new StandardSQLFunction( "nvl2" ) );
        registerFunction( "nvl", new StandardSQLFunction( "nvl" ) );
        registerFunction( "nullifzero", new StandardSQLFunction( "nullifzero" ) );
        registerFunction( "nullif", new StandardSQLFunction( "nullif" ) );
        registerFunction( "isnull", new StandardSQLFunction( "isnull" ) );

        // Impala String Functions
        registerFunction( "ascii", new StandardSQLFunction( "ascii", StandardBasicTypes.INTEGER ) );
        registerFunction( "base64encode", new StandardSQLFunction( "base64encode", StandardBasicTypes.STRING ) );
        registerFunction( "base64decode", new StandardSQLFunction( "base64decode", StandardBasicTypes.STRING ) );
        registerFunction( "left", new StandardSQLFunction( "left", StandardBasicTypes.STRING ) );
        registerFunction( "initcap", new StandardSQLFunction( "initcap", StandardBasicTypes.STRING ) );
        registerFunction( "group_concat", new StandardSQLFunction( "group_concat", StandardBasicTypes.STRING ) );
        registerFunction( "concat_ws", new StandardSQLFunction( "concat_ws", StandardBasicTypes.STRING ) );
        registerFunction( "concat", new StandardSQLFunction( "concat", StandardBasicTypes.STRING ) );
        registerFunction( "chr", new StandardSQLFunction( "chr", StandardBasicTypes.STRING ) );
        registerFunction( "btrim", new StandardSQLFunction( "btrim", StandardBasicTypes.STRING ) );
        registerFunction( "instr", new StandardSQLFunction( "instr", StandardBasicTypes.INTEGER ) );
        registerFunction( "find_in_set", new StandardSQLFunction( "find_in_set", StandardBasicTypes.INTEGER ) );
        registerFunction( "char_length", new StandardSQLFunction( "char_length", StandardBasicTypes.INTEGER ) );
        registerFunction( "levenshtein", new StandardSQLFunction( "levenshtein", StandardBasicTypes.INTEGER ) );
        registerFunction( "length", new StandardSQLFunction( "length", StandardBasicTypes.INTEGER ) );
        registerFunction( "locate", new StandardSQLFunction( "locate", StandardBasicTypes.INTEGER ) );
        registerFunction( "lcase", new StandardSQLFunction( "lcase", StandardBasicTypes.STRING ) );
        registerFunction( "lower", new StandardSQLFunction( "lower", StandardBasicTypes.STRING ) );
        registerFunction( "right", new StandardSQLFunction( "right", StandardBasicTypes.STRING ) );
        registerFunction( "reverse", new StandardSQLFunction( "reverse", StandardBasicTypes.STRING ) );
        registerFunction( "replace", new StandardSQLFunction( "replace", StandardBasicTypes.STRING ) );
        registerFunction( "repeat", new StandardSQLFunction( "repeat", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_replace", new StandardSQLFunction( "regexp_replace", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_extract", new StandardSQLFunction( "regexp_extract", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_escape", new StandardSQLFunction( "regexp_escape", StandardBasicTypes.STRING ) );
        registerFunction( "parse_url", new StandardSQLFunction( "parse_url", StandardBasicTypes.STRING ) );
        registerFunction( "ltrim", new StandardSQLFunction( "ltrim", StandardBasicTypes.STRING ) );
        registerFunction( "lpad", new StandardSQLFunction( "lpad", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_like", new StandardSQLFunction( "regexp_like", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "ucase", new StandardSQLFunction( "ucase", StandardBasicTypes.STRING ) );
        registerFunction( "upper", new StandardSQLFunction( "upper", StandardBasicTypes.STRING ) );
        registerFunction( "trim", new StandardSQLFunction( "trim", StandardBasicTypes.STRING ) );
        registerFunction( "translate", new StandardSQLFunction( "translate", StandardBasicTypes.STRING ) );
        registerFunction( "substring", new StandardSQLFunction( "substring", StandardBasicTypes.STRING ) );
        registerFunction( "substr", new StandardSQLFunction( "substr", StandardBasicTypes.STRING ) );
        registerFunction( "strright", new StandardSQLFunction( "strright", StandardBasicTypes.STRING ) );
        registerFunction( "strleft", new StandardSQLFunction( "strleft", StandardBasicTypes.STRING ) );
        registerFunction( "split_part", new StandardSQLFunction( "split_part", StandardBasicTypes.STRING ) );
        registerFunction( "space", new StandardSQLFunction( "space", StandardBasicTypes.STRING ) );
        registerFunction( "rtrim", new StandardSQLFunction( "rtrim", StandardBasicTypes.STRING ) );
        registerFunction( "rpad", new StandardSQLFunction( "rpad", StandardBasicTypes.STRING ) );

        // Impala Miscellaneous Functions
        registerFunction( "effective_user", new StandardSQLFunction( "effective_user", StandardBasicTypes.STRING ) );
        registerFunction( "current_database", new StandardSQLFunction( "current_database", StandardBasicTypes.STRING ) );
        registerFunction( "version", new StandardSQLFunction( "version", StandardBasicTypes.STRING ) );
        registerFunction( "uuid", new StandardSQLFunction( "uuid", StandardBasicTypes.STRING ) );
        registerFunction( "user", new StandardSQLFunction( "user", StandardBasicTypes.STRING ) );
        registerFunction( "sleep", new StandardSQLFunction( "sleep", StandardBasicTypes.STRING ) );
        registerFunction( "logged_in_user", new StandardSQLFunction( "logged_in_user", StandardBasicTypes.STRING ) );
        registerFunction( "pid", new StandardSQLFunction( "pid", StandardBasicTypes.INTEGER ) );

        // Impala Aggregate Functions
        registerFunction( "min", new StandardSQLFunction( "min" ) );
        registerFunction( "max", new StandardSQLFunction( "max" ) );
        registerFunction( "appx_median", new StandardSQLFunction( "appx_median" ) );
        registerFunction( "sum", new StandardSQLFunction( "sum", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev_samp", new StandardSQLFunction( "stddev_samp", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev_pop", new StandardSQLFunction( "stddev_pop", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev", new StandardSQLFunction( "stddev", StandardBasicTypes.DOUBLE ) );
        registerFunction( "ndv", new StandardSQLFunction( "ndv", StandardBasicTypes.DOUBLE ) );
        registerFunction( "avg", new StandardSQLFunction( "avg", StandardBasicTypes.DOUBLE ) );
        registerFunction( "count", new StandardSQLFunction( "count", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "group_concat", new StandardSQLFunction( "group_concat", StandardBasicTypes.STRING ) );

        getDefaultProperties().setProperty( Environment.STATEMENT_BATCH_SIZE, DEFAULT_BATCH_SIZE );

        limitHandler = new ImpalaLimitHandler();
    }

    @Override
    public String getAddColumnString() {
        return "add column";
    }

    @Override
    public boolean supportsLockTimeouts() {
        return false;
    }

    @Override
    public String getForUpdateString() {
        return "";
    }

    @Override
    public LimitHandler getLimitHandler() {
        return limitHandler;
    }

    @Override
    public boolean supportsLimit() {
        return true;
    }

    @Override
    public String getLimitString(String sql, boolean hasOffset) {
        return sql + (hasOffset ? " limit ? offset ? " : " limit ?");
    }

    @Override
    public boolean bindLimitParametersFirst() {
        return true;
    }

    @Override
    public boolean supportsIfExistsAfterTableName() {
        return false;
    }

    @Override
    public boolean supportsIfExistsBeforeTableName() {
        return true;
    }

    @Override
    public boolean supportsColumnCheck() {
        return true;
    }

    @Override
    public boolean supportsSequences() {
        return true;
    }

    @Override
    public boolean supportsPooledSequences() {
        return true;
    }

    @Override
    protected String getCreateSequenceString(String sequenceName) {
        return "create sequence " + sequenceName + " start with 1";
    }

    @Override
    protected String getCreateSequenceString(String sequenceName, int initialValue, int incrementSize) throws MappingException {
        if ( supportsPooledSequences() ) {
            return "create sequence " + sequenceName + " start with " + initialValue + " increment by " + incrementSize;
        }
        throw new MappingException( getClass().getName() + " does not support pooled sequences" );
    }

    @Override
    public SequenceInformationExtractor getSequenceInformationExtractor() {
        return SequenceInformationExtractorHSQLDBDatabaseImpl.INSTANCE;
    }

    @Override
    public String getSelectClauseNullString(int sqlType) {
        String literal;
        switch ( sqlType ) {
            case Types.BIGINT:
                literal = "cast(null as bigint)";
                break;
            case Types.BOOLEAN:
                literal = "cast(null as boolean)";
                break;
            case Types.CHAR:
                literal = "cast(null as string)";
                break;
            case Types.DECIMAL:
                literal = "cast(null as decimal)";
                break;
            case Types.DOUBLE:
                literal = "cast(null as double)";
                break;
            case Types.FLOAT:
                literal = "cast(null as float)";
                break;
            case Types.INTEGER:
                literal = "cast(null as int)";
                break;
            case Types.SMALLINT:
                literal = "cast(null as smallint)";
                break;
            case Types.VARCHAR:
                literal = "cast(null as string)";
                break;
            case Types.BLOB:
                literal = "cast(null as string)";
                break;
            case Types.CLOB:
                literal = "cast(null as string)";
                break;
            case Types.TINYINT:
                literal = "cast(null as tinyint)";
                break;
            case Types.TIMESTAMP:
                literal = "cast(null as timestamp)";
                break;
            default:
                literal = "cast(null as string)";
        }
        return literal;
    }

    @Override
    public boolean supportsUnionAll() {
        return true;
    }

    @Override
    public boolean supportsCurrentTimestampSelection() {
        return true;
    }

    @Override
    public boolean isCurrentTimestampSelectStringCallable() {
        return false;
    }

    @Override
    public String getCurrentTimestampSelectString() {
        return "select current_timestamp()";
    }

    @Override
    public String getCurrentTimestampSQLFunctionName() {
        return "current_timestamp";
    }

    @Override
    public boolean supportsCommentOn() {
        return true;
    }

    // Overridden informational metadata ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    @Override
    public boolean supportsEmptyInList() {
        return false;
    }

    @Override
    public boolean requiresCastingOfParametersInSelectClause() {
        return true;
    }

    @Override
    public boolean doesReadCommittedCauseWritersToBlockReaders() {
        return true;
    }

    @Override
    public boolean doesRepeatableReadCauseReadersToBlockWriters() {
        return true;
    }

    @Override
    public boolean supportsLobValueChangePropogation() {
        return false;
    }

    @Override
    public String toBooleanValueString(boolean bool) {
        return String.valueOf( bool );
    }

    @Override
    public NameQualifierSupport getNameQualifierSupport() {
        return NameQualifierSupport.SCHEMA;
    }

    @Override
    public boolean supportsNamedParameters(DatabaseMetaData databaseMetaData) throws SQLException {
        return false;
    }

    @Override
    public boolean dropConstraints() {
        return false;
    }

    @Override
    public String getCascadeConstraintsString() {
        return " CASCADE ";
    }
}

 

4. Resolver 등록

 

이제 Custom Database, Dialect, Resolver는 모두 만들었습니다.

그럼 이 클래스들이 hibernate가 사용할 수 있도록 등록해주면 됩니다.

등록하는 코드는 아래와 같습니다.

Properties properties = new Properties();
properties.setProperty("hibernate.dialect_resolvers", "CustomResolver 경로");
localContainerEntityManagerFactoryBean.setJpaProperties(properties);

 

4. 마무리

이번 포스팅에서는 조회 용도의 Impala-Hibernate 조합을 알아보았습니다.

 

많지는 않겠지만 혹시 사용이 필요한 사람들을 위해 코드를 공유합니다.

 

감사합니다.

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Mybatis  (0) 2021.02.24
Impala  (1) 2021.01.02
반응형

1. 서론

이번 포스팅에서는 앱단에서 mybatis 를 통해 Impala를 사용하는 과정에서 생긴 문제점을 공유하려 합니다.

 

2. 문제점

impala 는 hadoop에 있는 다양한 데이터( = hdfs, hbase, kudu, etc) 들을 SQL을 통해 간편하고 빠르게 조회 및 처리할 수 있도록 도와주는 인터페이스입니다.

 

mybatis는 SQL 매퍼로서 앱단에서 dynamic query를 가능하게 해주며,

xml을 이용하여 SQL과 java와 같은 프로그래밍언어를 분리하도록 도와주는 라이브러리입니다.

 

impala와 mybatis 를 같이 사용하면 간편한 코드작성으로 impala를 통해 hadoop 데이터를 핸들링 할 수 있습니다.

 

하지만, 최신 impala와 mybatis 버전에는 문제점이 하나 있습니다.

바로, java의 LocalDateTime 필드 처리입니다.

 

먼저, mybatis를 이용하여 impala 데이터를 처리하는 과정을 설명하겠습니다.

 

1) LocalDateTimeTypeHandler

 

mybatis는 vo class의 필드를 보고, LocalDateTime 필드의 경우 LocalDateTimeTypeHandler 를 호출하게 됩니다.

public class LocalDateTimeTypeHandler extends BaseTypeHandler<LocalDateTime> {

  @Override
  public void setNonNullParameter(PreparedStatement ps, int i, LocalDateTime parameter, JdbcType jdbcType)
          throws SQLException {
    ps.setObject(i, parameter);
  }

  @Override
  public LocalDateTime getNullableResult(ResultSet rs, String columnName) throws SQLException {
    return rs.getObject(columnName, LocalDateTime.class); // 문제 부분입니다.
  }

  @Override
  public LocalDateTime getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
    return rs.getObject(columnIndex, LocalDateTime.class);
  }

  @Override
  public LocalDateTime getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
    return cs.getObject(columnIndex, LocalDateTime.class);
  }
}

 

코드에서 보이는것과 같이 LocalDateTimeTypeHandler 에서는 ResultSet의 getObject를 사용합니다.

여기서 이 ResultSet은 jdbc 라이브러리에 있는 인터페이스이며, impala는 cloudera 측에서 제공하는 impalaJdbc 라이브러리가 있습니다.

 

2) S41ForwardResultSet

 

cloudera에서 제공하는 jdbc 라이브러리의 ResultSet 구현 클래스입니다.

클래스에는 getObject 메서드가 아래와 같이 구현 되어있고, Util 클래스인 ResultSetUtilities 의 getObjectByType 메서드를 호출하는것을 볼 수 있습니다.

 

public <T> T getObject(int var1, Class<T> var2) throws SQLException {
    try {
        LogUtilities.logFunctionEntrance(this.m_logger, new Object[]{var1, var2});
        this.checkIfOpen();
        return ResultSetUtilities.getObjectByType(this, var1, var2);
    } catch (Exception var4) {
        throw ExceptionConverter.getInstance().toSQLException(var4, this.getWarningListener(), this.getLogger());
    }
}

 

3) ResultSetUtilities

 

아래는 getObjectByType 의 일부 코드입니다.

일부 코드만 봐도 알듯이, mybatis에서 전달한 Class타입을 가지고 알맞게 캐스팅하여 반환하는 로직입니다.

 

하지만, 이 if-else 문에는 아쉽게도 LocalDateTime.class 로 비교하는 구문이 없어 에러가 나게 됩니다.

if (var2.equals(String.class)) {
    return var0.getString(var1);
} else if (var2.equals(Time.class)) {
    return var0.getTime(var1);
} else if (var2.equals(Timestamp.class)) {
    return var0.getTimestamp(var1);

 

반응형

 

3. 해결법

1) cloudera 측 문의

 

소스 한줄추가하면 가능한거라 메일로 문의를 드렸습니다.

하지만, 외국 기업인것도 있으며 오픈소스가 아니기에 제공하는 jdbc에 대한 클라이언트 요청을 전문적으로 받아 픽스하려면

유료 사용을 검토해야만 했습니다.

 

하지만..... 단순히 한줄이면 되었기에 이대로 포기할 수 없었습니다.

 jdbc 라이브러리가 안되면 mybatis handler를 수정하기로 마음먹었습니다.

 

2) mybatis custom handler 등록

 

다행히도, mybatis에는 custom handler를 등록할 수 있도록 제공하고 있었습니다.

 

소스는 아래와 같이, sqlSesqlSessionFactoryBean 에 커스텀 핸들러를 등록하는겁니다.

sqlSessionFactoryBean.setTypeHandlersPackage("커스텀 핸들러 패키지 경로");

 

핸들러는 아래와 같이 등록하였습니다.

 

package 커스텀 핸들러 패키지 경로;

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class LocalDateTimeTypeHandler extends BaseTypeHandler<LocalDateTime> {

    @Override
    public void setNonNullParameter(PreparedStatement ps, int i, LocalDateTime parameter, JdbcType jdbcType)
            throws SQLException {
        ps.setTimestamp(i, Timestamp.valueOf(parameter));
    }

    @Override
    public LocalDateTime getNullableResult(ResultSet rs, String columnName) throws SQLException {
        Timestamp timestamp = rs.getTimestamp(columnName);
        return getLocalDateTime(timestamp);
    }

    @Override
    public LocalDateTime getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
        Timestamp timestamp = rs.getTimestamp(columnIndex);
        return getLocalDateTime(timestamp);
    }

    @Override
    public LocalDateTime getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
        Timestamp timestamp = cs.getTimestamp(columnIndex);
        return getLocalDateTime(timestamp);
    }

    private static LocalDateTime getLocalDateTime(Timestamp timestamp) {
        if (timestamp != null) {
            return timestamp.toLocalDateTime();
        }
        return null;
    }
}

 

사실, 이 핸들러 코드는 mybatis 마이너버전의 소스입니다.

하지만, LocalDateTime 하나때문에 마이너한 mybatis 버전을 메이저로 올리지 못했기에 mybatis 버전은 메이저로, 문제가 되는 handler는 커스텀하여 해결하였습니다.

 

4. 마무리

apache impala 에서는 timestamp 컬럼을  java 언어에서 사용할 시 LocalDateTime을 사용하라고 권장하고 있습니다.

다만, jdbc에서는 아직 지원이 되지 않아 겪은 문제입니다.

 

하루빨리 jdbc 에도 LocalDateTime 지원이 되어 편하게 개발하는 날이 오길 기대하고 있습니다.

 

감사합니다.

 

 

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Hibernate  (1) 2021.02.24
Impala  (1) 2021.01.02
반응형

1. 서론

이번 포스팅에서는 Apache Impala에 대해 소개하려고 합니다.

 

Cloudera 도큐먼트를 보며 개인적으로 6.x 버전부터 Impala 와 Kudu 조합을 추천하고 있다고 느꼈습니다.
때문에, Hadoop에 관심이 있으신 분이라면 Impala와 Kudu에 대해 알아보는 것을 추천합니다.

물론, Impala는 Kudu가 아니더라도 File에 대해서도 SQL을 수행할 수 있는 서비스입니다.

 

2. Impala란?

임팔라는 Hadoop Eco에 저장되어 있는 데이터를 실시간 병렬 조회가 가능한 SQL 쿼리 엔진입니다.

 

Impala는 아래와 같이 병렬성을 가질수 있는 아키텍처로 이루어져 있습니다.

 

 

 

1) Impalad

 

Impalad는 impala daemon으로 실제 쿼리를 수행하는 역할을 담당합니다.

 

Impalad는 그림과 같이 Query Compiler, Query Coordinator, Query Executor로 이루어져 있습니다.

 

Compiler는 말그대로 client에서 요청받은 SQL 질의를 Compile 한 후 어떻게 처리할지 결정합니다.

Coordinator는 Compiler가 전달한 요청서를 받아 각각 다른 호스트에 있는 Impalad의 Executor에 요청합니다.

이는 데이터의 locality를 보장하며 이로인해 분산 질의 처리가 가능해지며 성능이 향상됩니다.

 

마지막으로 Executor는 실제로 자신이 담당하는 데이터에 대해서 SQL 질의를 수행하는 역할을 합니다.

질의 수행이 끝난 Executor는 Coordinator에게 결과를 반환합니다.

 

 

여기서, 눈치 채셨겠지만 각 Impalad는 metadata를 가지고 있습니다.

metadata는 필요한 데이터가 어느 호스트(노드)에 있는지에 대한 mapping 정보가 담겨져 있습니다.

 

Impalad가 metadata를 가지고 있기 때문에, Client의 요청은 모든 Impalad가 수신 할 수 있습니다.

 

 

2) StateStore

 

StateStore는 각 Impalad의 상태를 관리하며 데이터 일관성을 위해 Metadata의 변경 사항을 모든 Impalad에 브로드캐스팅하는 역할을 담당합니다.

 

StateStore는 Impala 서비스에서 없다고 동작이 안하진 않습니다.

다만, 클러스터 내에서 Impalad가 제외된 경우 쿼리 가능한 daemon 그룹에서 제외를 못하며, metadata 갱신이 안되는 경우가 발생할 수 있습니다.

 

때문에, 사실상 운영시 StateStore도 띄어야 합니다.

 

3) Catalog Service

 

Catalog Service는 Impala 쿼리로 데이터의 CUD 혹은 DDL 작업 시,

이를 impalad의 metadata에 반영하기 위해 StateStore에 브로드캐스팅해달라고 요청하는 역할을 담당합니다.

브로드캐스팅은 StateStore를 통해서 수행되므로 Catalog Service와 StateStore는 같은 호스트에서 수행하는 것이 좀 더 효율적입니다.

그림에서와 같이 Catalog Service도 metadata가 있으며, 이 metadata가 원본이라고 생각하시면 됩니다.

 

3. Impala 사용시 주의점

Impala는 HMS( Hive MetaStore ) 가 떠있어야 가능합니다.

 

그 이유는 Catalog Service가 가지고 있는 Metadate 관리는 실제로 HMS에서 관리하는 데이터를 복사한것이기 때문입니다.

 

때문에, Hive와 Impala를 같이 사용시 주의할 점이 생기게 됩니다.

 

주의할 점은 HiveQL로 인한 metadata 변경은 impala에서 알 수 없다는 점입니다.

HiveQL로 변경된 데이터는 HMS에는 반영되나 impala에서 원본 metadata를 관리하는 Catalog Service에는 반영이 안되기 때문입니다.

 

하지만, 위에서 말씀드린것처럼 Catalog Service의 metadata 은 HMS에서 Copy한 것입니다.

그러니, HiveQL로 변경된 부분을 Impala metadata에도 반영하고 싶은 경우에는 HMS에서 Copy를 다시 하면 됩니다.

 

다시 변경 부분만을 Copy 해와 Impalad에 브로드캐스팅할 수 있도록 Impala는 REFRESH, INVALIDATE METADATA 쿼리를 제공합니다.

 

그럼, Hive에서도 Impala 쿼리로 인한 변경사항을 반영해야하는 문제가 있을까요?

결국 Metadata의 원본은 HMS이며, Impala에서 변경된 부분은 HMS에도 반영됩니다.

또한, Hive는 매 질의시 HMS에서 Metadata를 조회해 처리하기 때문에 Hive는 REFRESH,INVALIDATE METADATA 와 같은 작업을 하지 않아도 됩니다.

 

반응형

 

4. Hive 와 Impala 차이점

Hive와 Impala의 차이점은 동작의 차이로 인해 발생합니다.

Hive는 SQL을 통해 편하게 MR을 수행하는 서비스입니다.

Impala는 MR로 동작하지 않습니다. 위에서 소개한것과 같이 Impala는 MR이 아닌 SQL 엔진을 통해 동작합니다.

 

이로인해, Impala는 Hive에 비해 처리속도가 빠릅니다.

그렇다고, Hive를 모두 Impala 로 대체하는것은 옳바르진 않습니다.

 

우선, Hive, Impala 모두 OLAP 적합한 서비스입니다. 다만, Impala는 SQL 처리가 메모리상에서 이루어집니다.

때문에, 호스트에 메모리가 부족하거나 메모리를 너무 과도하게 차지하는 쿼리의 경우에는

Hive를 사용하는것이 클러스터에 부담도 주지않으며 좋은 선택일 수 있습니다.

물론, Hive는 MR이기에 Spark로 대체하는 방법도 좋은 방안입니다.
하지만, Spark도 기본은 메모리 연산이기에 각 요구사항에 맞게 선택하여 사용해야 합니다.

 

또한, Hive, Impala 는 OLAP에 적합한 서비스라고 했습니다. 하지만 이는 Hdfs 파일에 대해서 쿼리를 날리는 경우입니다.

 

Impala 를 SQL 인터페이스로 사용하지만 실제 데이터는 Kudu 혹은 Hbase에 있다면, 쿼리는 Kudu, Hbase 특성에 맞게 작성해야합니다.

예를들어, Kudu의 경우에는 PK에 대해서 B+Tree로 index를 가지고 있기 때문에 Where 절에 PK 넣는 쿼리의 경우에는 데이터가 대용량이 아니더라도 응답속도가 빠르게 나오게 됩니다.

impalad 가 Kudu Master 혹은 Hbase Master에 요청하여 데이터를 가져오기 때문입니다.
이런 경우에는 metadata도 사실상 무의미하며 해당 데이터를 담당하는 Kudu, Hbase 영역으로 넘어가게 됩니다.

하지만, 이런 저장소에서 제공하는 별도의 클라이언트를 사용하지 않고 Impala 를 통해 간편하게 SQL로 데이터를 Handling할 수 있다는 점이 Impala의 장점입니다.

 

5. HAProxy

Client의 요청은 모든 Impalad가 수신 할 수 있다고 했습니다.

때문에, 관리자 입장에서는 하나의 Impalad에만 요청이 쏠리지 않도록 HAProxy를 구축해야 합니다.

 

HAProxy를 구축하면 얻는 이점은 아래와 같습니다.

 

  1. 부하 분산
  2. Client의 요청을 단일 endpoint 로 관리 가능
    1. Impalad가 추가된다면 HAProxy에 추가된 Impalad 호스트만 추가하면 됩니다.
Cloudera Impala Document 에서는 무료 오픈소스인 haproxy 를 소개하고 있습니다.

 

6. 마무리

이번 포스팅에서는 Impala에 대해 알아봤습니다.

 

부족한 설명이였지만 읽어주셔서 감사합니다.

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Hibernate  (1) 2021.02.24
Impala - Mybatis  (0) 2021.02.24

+ Recent posts