반응형

1. 서론

Kafka 를 사용하며 겪은 문제에 대해 설명하고 해결방안을 공유합니다.

 

2. 문제

Kafka 사용 중에 send한 데이터가 유실된 것을 발견되었습니다.

저는 acks 설정을 all로 했으며, send 후에는 수동으로 flush 까지 호출을 했습니다.

 

하지만 메시지는 유실이 되어 저는 매우 당황했습니다.

 

문제 파악을 위해 Kafka Producer의 flush 메서드를 살펴봤습니다.

 

 

코드를 살펴보니 요청이 실패하면 record는 유실이 될수도 있다고 친절히 나와있네요.. OMG

 

그럼, 실패로 간주하는 경우는 언제 일까요?

 

Producer 

 

먼저 Producer의 내부 동작을 알아보겠습니다.

 

Producer는 내부적으로 아래와 같이 동작합니다.

 

1. Kafka Producer send => accumulator 버퍼에 append

2. linger.ms 와 batch.size 설정을 통해 flush => 실제 broker에 쓰기 요청

3. 이때 acks 설정에 따라 응답을 받게되며, 실패 시 retries 와 delivery.timeout.ms 설정으로 재시도 여부를 판단.

 

아래는 KafkaProducer가 실제 send를 하기위해 사용하는 Sender 클래스의 일부입니다.

 

 

broker에게서 error를 받은 경우 canRetry 메서드를 호출합니다.

canRetry는 재시도가 가능한지 판단하는 메서드입니다.

아래 그림과 같이 retries, 와 delivery.timeout.ms 설정에 따라 boolean 값을 반환합니다.

 

그럼, retries와 delivery.timeout.ms default 값을 알아보겠습니다.

 

retries는 Integer.MAX로 2147483647 입니다.

delivery.timeout.ms120000 로 2분 입니다.

 

즉, 2분 동안 2147483647 횟수만큼은 재시도를 한다가 되는데요.

 

이 조건에도 실패가 되는 경우에는 사용자 정의 callback 함수를 통해 재처리하는 방향으로 구현을 해야합니다.

 

 

반응형

 

3. 해결 방안

저의 경우 실패한 이유는 broker 측에서 timeout 내에 replica가 이루어지지 않아서 인데요.

 

위와 같은 경우에는 브로커 성능상 이슈로 아래와 같은 해결방안들이 있을 수 있습니다.

 

  1. 토픽의 replica 설정 값을 내리는 방법(reduce topic's replica-factor config value)
  2. 브로커 설정 중 num.network.threads and num.io.threads 의 값을 늘리는 방법  (Increase the value of num.network.threads and num.io.threads among broker settings)
  3. broker 서버의 cgroup memory limit을 늘려, follower들의 fetch write시 disk io 를 감소시켜 복제 속도를 증가시키는 방법
  4. 2번의 broker 설정 보다 producer의 병렬 처리 갯수가 과도하게 많은지 체크.
    1. 많다면, broker 의 cpu, memory 사용률이 surge하지 않는 선에서 producer와 broker 설정 조절

4. 마무리

위 내용에서 본 것처럼, Kafka 는 여러 Config 를 제공하고 있습니다. 

여러분은 Kafka 내부동작을 자세히 알고 비즈니스 요구사항에 맞게 설정값을 세팅해서 사용하도록 해야합니다.

 

이 글이 여러분의 설정 값 세팅에 기여했으면 좋겠습니다.

 

감사합니다.

반응형
반응형

1. 서론

저번 포스팅에서는 impala- mybatis 사용에 대해서 알아보았습니다.

 

그럼, impala는 SQL 지원이 되고 relation이 있으니 ORM 기술인 hibernate도 사용할 수 있지 않나? 라고 생각할 수 있습니다.

 

이 질문은 반은 맞고, 반은 틀리다고 할 수 있습니다.

이유는, hibernate 이념과 impala 이념이 다르기 때문입니다.

 

hibernate는 기본적으로 트랜잭션이 가능한 저장소 를 지원하며, 데이터의 CUD는 무조건 트랜잭션을 통해 이루어집니다.

 

하지만, impala는 하나의 데이터 저장소가 아닌 hdfs, kudu, hbase와 같이 여러 저장소를 지원하는 SQL 인터페이스이며,

가장 중요한 트랜잭션이 없습니다. 게다가 hdfs의 경우에는 delete, update는 되지도 않습니다.

 

하둡의 hbase, kudu 등등은 저장소라고 볼 수 있지만, 다른 dbms들과 같이 트랜잭션개념은 없습니다.
이런 저장소 프로젝트들은 자체 클라이언트를 제공하고 있으며 실패에 대한 처리는 개발자의 몫으로 넘깁니다.

물론, atomic 특성을 제공하는 저장소도 있지만 이것은 트랜잭션이랑은 별개의 내용입니다.

 

하지만, 조회의 경우에는 말이 달라집니다. 일반적으로 조회는 트랜잭션이 필요없습니다.

게다가, 제가 일하는 조직에서는 DB의 실시간 백업용으로 ROS용 Kudu를 사용하고 있습니다.

 

저장소 자체가 ROS이기에 어플리케이션 단에서는 조회만을 하게 되고, hibernate의 엔티티 관계에 대한 이점 을 누릴 수 있습니다.

 

2. 문제점

하지만, Dialect 관련 문제가 하나 있었습니다.

 

hibernate는 기본적으로 Dialect(= 방언)를 설정하지 않으면,

jdbc를 통해 알맞은 데이터베이스가 무엇이고 데이터베이스에 맞는 Dialect을 찾아 세팅해줍니다.

 

jdbc 로부터 데이터베이스를 찾아 Dialect을 세팅하는 과정은 아래와 같습니다.

 

1) DialectResolverInitiator

 

hibernate에서는 Dialect 결정을 위한 서비스 클래스로 DialectResolverInitiator 클래스의 initiateService 메서드를 호출합니다.

 

아래는 DialectResolverInitiator 클래스의 initiateService 메서드입니다.

 

@Override
public DialectResolver initiateService(Map configurationValues, ServiceRegistryImplementor registry) {
	final DialectResolverSet resolver = new DialectResolverSet();

	applyCustomerResolvers( resolver, registry, configurationValues );
	resolver.addResolver( new StandardDialectResolver() );
    
	return resolver;
}

 

코드에서 보이는것과 같이 customResolver 제외하고, 기본 Resolver로 StandardDialectResolver 를 사용합니다.

 

 

2) StandardDialectResolver

 

아래는 StandardDialectResolver 클래스 코드입니다.

hibernate-core에 있는 Database enum 클래스를 iterate돌며 jdbc에 알맞은 dialect를 찾는 로직입니다.

public final class StandardDialectResolver implements DialectResolver {

	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {

		for ( Database database : Database.values() ) {
			Dialect dialect = database.resolveDialect( info );
			if ( dialect != null ) {
				return dialect;
			}
		}

		return null;
	}
}

 

3) DataBase

 

아래는 DataBase enum 클래스의 일부 코드입니다.

코드에서 보이듯이 각 Database에 대해서 정의가 되어 있으며, 인자로 받은 DialectResolutionInfo info 를 통해 맞는 Database인지 확인하는 로직이 있습니다.

 

DialectResolutionInfo 에서 사용하는 getDatabaseName, getDatabaseMajorVersion 등의 메서드는 jdbc의 DatabaseMetaData interface 구현체 메서드를 사용하도록 되어있습니다.

 

ORACLE {
	@Override
	public Class<? extends Dialect> latestDialect() {return Oracle12cDialect.class;}

	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		final String databaseName = info.getDatabaseName();

		if ( "Oracle".equals( databaseName ) ) {
			final int majorVersion = info.getDatabaseMajorVersion();

			switch ( majorVersion ) {
				case 12:
					return new Oracle12cDialect();
				case 11:
					// fall through
				case 10:
					return new Oracle10gDialect();
				case 9:
					return new Oracle9iDialect();
				case 8:
					return new Oracle8iDialect();
				default:
					return latestDialectInstance( this );
			}
		}

		return null;
	}
},
POINTBASE {
	@Override
	public Class<? extends Dialect> latestDialect() {
		return PointbaseDialect.class;
	}
	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		return null;
	}
},
POSTGRESQL {
	@Override
	public Class<? extends Dialect> latestDialect() {
		return PostgreSQL10Dialect.class;
	}
	@Override
	public Dialect resolveDialect(DialectResolutionInfo info) {
		final String databaseName = info.getDatabaseName();
		if ( "PostgreSQL".equals( databaseName ) ) {
			final int majorVersion = info.getDatabaseMajorVersion();
			final int minorVersion = info.getDatabaseMinorVersion();
			if ( majorVersion < 8 ) {
				return new PostgreSQL81Dialect();
			}
			if ( majorVersion == 8 ) {
				return minorVersion >= 2 ? new PostgreSQL82Dialect() : new PostgreSQL81Dialect();
			}
			if ( majorVersion == 9 ) {
				if ( minorVersion < 2 ) {
					return new PostgreSQL9Dialect();
				}
				else if ( minorVersion < 4 ) {
					return new PostgreSQL92Dialect();
				}
				else if ( minorVersion < 5 ) {
					return new PostgreSQL94Dialect();
				}
				else {
					return new PostgreSQL95Dialect();
				}
			}
			return latestDialectInstance( this );
		}
		return null;
	}
},

 


하지만 서론에서 말씀드린것과 같이 hibernate 이념과 impala는 맞지 않아,

Database enum 클래스에는 IMPALA 가 없을 뿐더러 ImpalaDialect도 존재하지 않습니다.

위에 보여드린 3개 클래스는 모두 hibernate-core lib에 있습니다.

 

 

반응형

3. 해결법

1) Hibernate PR

 

위 이슈와 관련해서, hibernate 쪽에 PR을 올려둔 상태입니다.

하지만, hibernate의 이념과 맞지 않고, 너무 특정케이스를 위해서이기 때문에 PR에 대한 approve가 될지는 모르겠습니다.....ㅠ

소스 변경이 아닌 추가여서, 장애 포인트는 없지만 이념과 다르다는 피드백을 받을거 같네요....ㅎㅎ

 

2) Custom resolver, database, dialect 등록

 

그래도 사용은 할 수 있습니다.

바로 위에서 살펴본 DialectResolverInitiator 클래스에 customResolver를 등록하는 코드가 있기 때문입니다!!

 

혹시나, 저와 같이 필요한 경우가 있다면 아래와 같이 커스텀한 resolver, database, dialect를 사용하여

조회용도의 impala-hibernate 조합을 사용할 수 있습니다.

 

1. Custom Resolver

 

커스텀한 Resolver를 만들어줍니다.

 

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class CustomDialectResolver implements DialectResolver {

    @Override
    public Dialect resolveDialect(DialectResolutionInfo info) {
        for ( ImpalaDatabase database : ImpalaDatabase.values() ) {
            Dialect dialect = database.resolveDialect( info );
            if ( dialect != null ) {
                return dialect;
            }
        }
        return null;
    }
}

 

2. Custom Database 

 

Resolver에서 사용하는 Database enum 클래스를 만들어 줍니다.

코드에서 알 수 있듯이, ImpalaJdbc는 getDatabaseName 메서드로 Impala 를 반환합니다.

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public enum ImpalaDatabase {

    IMPALA {
        @Override
        public Class<? extends Dialect> latestDialect() {
            return ImpalaDialect.class;
        }

        @Override
        public Dialect resolveDialect(DialectResolutionInfo info) {
            final String databaseName = info.getDatabaseName();
            if ("Impala".equals(databaseName)) {
                return latestDialectInstance(this);
            }
            return null;
        }
    };

    public abstract Class<? extends Dialect> latestDialect();

    public abstract Dialect resolveDialect(DialectResolutionInfo info);

    private static Dialect latestDialectInstance(ImpalaDatabase database) {
        try {
            return database.latestDialect().newInstance();
        }
        catch (InstantiationException | IllegalAccessException e) {
            throw new HibernateException( e );
        }
    }
}

 

3. Custom Dialect

 

Database에서 반환할 Dialect를 만듭니다.

해당 Dialect은 Cloudera Document - SQL Reference를 참고하여 만들었습니다.

모든 function, dml, ddl에 대해서 확인하는데에 한계가 있어 틀린부분이 있을 수 있지만
애초에 Custom Dialect이기 때문에 해당 부분은 fix하여 사용하면 됩니다.

 

import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.sql.Types;
import org.hibernate.MappingException;
import org.hibernate.cfg.Environment;
import org.hibernate.dialect.Dialect;
import org.hibernate.dialect.function.StandardSQLFunction;
import org.hibernate.dialect.pagination.AbstractLimitHandler;
import org.hibernate.dialect.pagination.LimitHandler;
import org.hibernate.dialect.pagination.LimitHelper;
import org.hibernate.engine.jdbc.env.spi.NameQualifierSupport;
import org.hibernate.engine.spi.RowSelection;
import org.hibernate.tool.schema.extract.internal.SequenceInformationExtractorHSQLDBDatabaseImpl;
import org.hibernate.tool.schema.extract.spi.SequenceInformationExtractor;
import org.hibernate.type.StandardBasicTypes;

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class ImpalaDialect extends Dialect {

    private final class ImpalaLimitHandler extends AbstractLimitHandler {
        @Override
        public String processSql(String sql, RowSelection selection) {
            final boolean hasOffset = LimitHelper.hasFirstRow( selection );
            return sql + (hasOffset ? " limit ? offset ?" : " limit ?");
        }

        @Override
        public boolean supportsLimit() {
            return true;
        }

        @Override
        public boolean bindLimitParametersFirst() {
            return true;
        }
    }

    private final LimitHandler limitHandler;


    /**
     * Constructs a ImpalaDialect
     */
    public ImpalaDialect() {
        super();

        registerColumnType( Types.BIGINT, "bigint" );
        registerColumnType( Types.BOOLEAN, "boolean" );
        registerColumnType( Types.CHAR, "char($l)" );
        registerColumnType( Types.DECIMAL, "decimal($p,$s)" );
        registerColumnType( Types.DOUBLE, "double" );
        registerColumnType( Types.FLOAT, "float" );
        registerColumnType( Types.INTEGER, "int" );
        registerColumnType( Types.SMALLINT, "smallint" );
        registerColumnType( Types.VARCHAR, "string" );
        registerColumnType( Types.BLOB, "string" );
        registerColumnType( Types.CLOB, "string" );
        registerColumnType( Types.TINYINT, "tinyint" );
        registerColumnType( Types.TIMESTAMP, "timestamp" );

        // Impala Mathematical Functions
        registerFunction( "abs", new StandardSQLFunction("abs") );
        registerFunction( "acos", new StandardSQLFunction( "acos", StandardBasicTypes.DOUBLE ) );
        registerFunction( "asin", new StandardSQLFunction( "asin", StandardBasicTypes.DOUBLE ) );
        registerFunction( "atan", new StandardSQLFunction("atan", StandardBasicTypes.DOUBLE) );
        registerFunction( "atan2", new StandardSQLFunction("atan2", StandardBasicTypes.DOUBLE) );
        registerFunction( "bin", new StandardSQLFunction( "bin", StandardBasicTypes.STRING ) );
        registerFunction( "ceil", new StandardSQLFunction("ceil") );
        registerFunction( "dceil", new StandardSQLFunction("dceil") );
        registerFunction( "ceiling", new StandardSQLFunction("ceiling") );
        registerFunction( "conv", new StandardSQLFunction("conv") );
        registerFunction( "cos", new StandardSQLFunction( "cos", StandardBasicTypes.DOUBLE ) );
        registerFunction( "cot", new StandardSQLFunction( "cot", StandardBasicTypes.DOUBLE ) );
        registerFunction( "cosh", new StandardSQLFunction( "cosh", StandardBasicTypes.DOUBLE ) );
        registerFunction( "degrees", new StandardSQLFunction( "degrees", StandardBasicTypes.DOUBLE ) );
        registerFunction( "e", new StandardSQLFunction( "e", StandardBasicTypes.DOUBLE ) );
        registerFunction( "exp", new StandardSQLFunction( "exp", StandardBasicTypes.DOUBLE ) );
        registerFunction( "factorial", new StandardSQLFunction( "factorial", StandardBasicTypes.INTEGER) );
        registerFunction( "floor", new StandardSQLFunction( "floor", StandardBasicTypes.INTEGER ) );
        registerFunction( "dfloor", new StandardSQLFunction( "dfloor", StandardBasicTypes.INTEGER ) );
        registerFunction( "fmod", new StandardSQLFunction("fmod") );
        registerFunction( "fnv_hash", new StandardSQLFunction( "fnv_hash", StandardBasicTypes.INTEGER ) );
        registerFunction( "greatest", new StandardSQLFunction("greatest") );
        registerFunction( "hex", new StandardSQLFunction( "hex", StandardBasicTypes.STRING ) );
        registerFunction( "is_inf", new StandardSQLFunction( "is_inf", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "is_nan", new StandardSQLFunction( "is_nan", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "least", new StandardSQLFunction("least") );
        registerFunction( "ln", new StandardSQLFunction( "ln", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log", new StandardSQLFunction( "log", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log10", new StandardSQLFunction( "log10", StandardBasicTypes.DOUBLE ) );
        registerFunction( "log2", new StandardSQLFunction( "log2", StandardBasicTypes.DOUBLE ) );
        registerFunction( "max_int", new StandardSQLFunction( "max_int", StandardBasicTypes.INTEGER ) );
        registerFunction( "max_bigint", new StandardSQLFunction( "max_bigint", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "max_smallint", new StandardSQLFunction( "max_smallint", StandardBasicTypes.INTEGER ) );
        registerFunction( "max_tinyint", new StandardSQLFunction( "max_tinyint", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_int", new StandardSQLFunction( "min_int", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_bigint", new StandardSQLFunction( "min_bigint", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "min_smallint", new StandardSQLFunction( "min_smallint", StandardBasicTypes.INTEGER ) );
        registerFunction( "min_tinyint", new StandardSQLFunction( "min_tinyint", StandardBasicTypes.INTEGER ) );
        registerFunction( "mod", new StandardSQLFunction( "mod" ) );
        registerFunction( "murmur_hash", new StandardSQLFunction( "murmur_hash", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "negative", new StandardSQLFunction( "negative" ) );
        registerFunction( "pi", new StandardSQLFunction( "pi", StandardBasicTypes.DOUBLE ) );
        registerFunction( "pmod", new StandardSQLFunction( "pmod" ) );
        registerFunction( "positive", new StandardSQLFunction( "positive" ) );
        registerFunction( "pow", new StandardSQLFunction( "pow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "power", new StandardSQLFunction( "power", StandardBasicTypes.DOUBLE ) );
        registerFunction( "dpow", new StandardSQLFunction( "dpow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "fpow", new StandardSQLFunction( "fpow", StandardBasicTypes.DOUBLE ) );
        registerFunction( "precision", new StandardSQLFunction( "precision", StandardBasicTypes.INTEGER ) );
        registerFunction( "quotient", new StandardSQLFunction( "quotient", StandardBasicTypes.INTEGER ) );
        registerFunction( "radians", new StandardSQLFunction( "radians", StandardBasicTypes.DOUBLE ) );
        registerFunction( "rand", new StandardSQLFunction( "rand", StandardBasicTypes.DOUBLE ) );
        registerFunction( "random", new StandardSQLFunction( "random", StandardBasicTypes.DOUBLE ) );
        registerFunction( "round", new StandardSQLFunction( "round" ) );
        registerFunction( "dround", new StandardSQLFunction( "dround" ) );
        registerFunction( "scale", new StandardSQLFunction( "scale", StandardBasicTypes.INTEGER ) );
        registerFunction( "sign", new StandardSQLFunction( "sign", StandardBasicTypes.INTEGER ) );
        registerFunction( "sin", new StandardSQLFunction( "sin", StandardBasicTypes.DOUBLE ) );
        registerFunction( "sinh", new StandardSQLFunction( "sinh", StandardBasicTypes.DOUBLE ) );
        registerFunction( "sqrt", new StandardSQLFunction( "sqrt", StandardBasicTypes.DOUBLE ) );
        registerFunction( "tan", new StandardSQLFunction( "tan", StandardBasicTypes.DOUBLE ) );
        registerFunction( "tanh", new StandardSQLFunction("tanh", StandardBasicTypes.DOUBLE) );
        registerFunction( "truncate", new StandardSQLFunction( "truncate" ) );
        registerFunction( "dtrunc", new StandardSQLFunction( "dtrunc" ) );
        registerFunction( "trunc", new StandardSQLFunction( "trunc" ) );
        registerFunction( "unhex", new StandardSQLFunction( "unhex", StandardBasicTypes.STRING ) );
        registerFunction( "width_bucket", new StandardSQLFunction( "width_bucket" ) );

        // Impala Bit Functions
        registerFunction( "bitand", new StandardSQLFunction( "bitand" ) );
        registerFunction( "bitor", new StandardSQLFunction( "bitor" ) );
        registerFunction( "bitnot", new StandardSQLFunction( "bitnot" ) );
        registerFunction( "bitxor", new StandardSQLFunction( "bitxor" ) );
        registerFunction( "countset", new StandardSQLFunction( "countset" ) );
        registerFunction( "getbit", new StandardSQLFunction( "getbit" ) );
        registerFunction( "rotateleft", new StandardSQLFunction( "rotateleft" ) );
        registerFunction( "rotateright", new StandardSQLFunction( "rotateright" ) );
        registerFunction( "setbit", new StandardSQLFunction( "setbit" ) );
        registerFunction( "shiftleft", new StandardSQLFunction( "shiftleft" ) );
        registerFunction( "shiftright", new StandardSQLFunction( "shiftright" ) );

        // Impala Date and Time Functions
        registerFunction( "add_months", new StandardSQLFunction( "add_months", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "adddate", new StandardSQLFunction( "adddate", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "current_timestamp", new StandardSQLFunction( "current_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_add", new StandardSQLFunction( "date_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_part", new StandardSQLFunction( "date_part", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_sub", new StandardSQLFunction( "date_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "date_trunc", new StandardSQLFunction( "date_trunc", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "datediff", new StandardSQLFunction( "datediff", StandardBasicTypes.INTEGER ) );
        registerFunction( "day", new StandardSQLFunction( "day", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayname", new StandardSQLFunction( "dayname", StandardBasicTypes.STRING ) );
        registerFunction( "dayofweek", new StandardSQLFunction( "dayofweek", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayofmonth", new StandardSQLFunction( "dayofmonth", StandardBasicTypes.INTEGER ) );
        registerFunction( "dayofyear", new StandardSQLFunction( "dayofyear", StandardBasicTypes.INTEGER ) );
        registerFunction( "days_add", new StandardSQLFunction( "days_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "days_sub", new StandardSQLFunction( "days_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "extract", new StandardSQLFunction( "extract", StandardBasicTypes.INTEGER ) );
        registerFunction( "from_timestamp", new StandardSQLFunction( "from_timestamp", StandardBasicTypes.STRING ) );
        registerFunction( "from_unixtime", new StandardSQLFunction( "from_unixtime", StandardBasicTypes.STRING ) );
        registerFunction( "from_utc_timestamp", new StandardSQLFunction( "from_utc_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "hour", new StandardSQLFunction( "hour", StandardBasicTypes.INTEGER ) );
        registerFunction( "hours_add", new StandardSQLFunction( "hours_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "hours_sub", new StandardSQLFunction( "hours_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "int_months_between", new StandardSQLFunction( "int_months_between", StandardBasicTypes.INTEGER ) );
        registerFunction( "microseconds_add", new StandardSQLFunction( "microseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "microseconds_sub", new StandardSQLFunction( "microseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "millisecond", new StandardSQLFunction( "millisecond", StandardBasicTypes.INTEGER ) );
        registerFunction( "milliseconds_add", new StandardSQLFunction( "milliseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "milliseconds_sub", new StandardSQLFunction( "milliseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "minute", new StandardSQLFunction( "minute", StandardBasicTypes.INTEGER ) );
        registerFunction( "minutes_add", new StandardSQLFunction( "minutes_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "minutes_sub", new StandardSQLFunction( "minutes_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "month", new StandardSQLFunction( "month", StandardBasicTypes.INTEGER ) );
        registerFunction( "monthname", new StandardSQLFunction( "monthname", StandardBasicTypes.STRING ) );
        registerFunction( "months_add", new StandardSQLFunction( "months_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "months_between", new StandardSQLFunction( "months_between", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "months_sub", new StandardSQLFunction( "months_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "nanoseconds_add", new StandardSQLFunction( "nanoseconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "nanoseconds_sub", new StandardSQLFunction( "nanoseconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "next_day", new StandardSQLFunction( "next_day", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "now", new StandardSQLFunction( "now", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "quarter", new StandardSQLFunction( "quarter", StandardBasicTypes.INTEGER ) );
        registerFunction( "second", new StandardSQLFunction( "second", StandardBasicTypes.INTEGER ) );
        registerFunction( "seconds_add", new StandardSQLFunction( "seconds_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "seconds_sub", new StandardSQLFunction( "seconds_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "subdate", new StandardSQLFunction( "subdate", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "timeofday", new StandardSQLFunction( "timeofday", StandardBasicTypes.STRING ) );
        registerFunction( "timestamp_cmp", new StandardSQLFunction( "timestamp_cmp", StandardBasicTypes.INTEGER ) );
        registerFunction( "to_date", new StandardSQLFunction( "to_date", StandardBasicTypes.STRING ) );
        registerFunction( "to_timestamp", new StandardSQLFunction( "to_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "to_utc_timestamp", new StandardSQLFunction( "to_utc_timestamp", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "unix_timestamp", new StandardSQLFunction( "unix_timestamp", StandardBasicTypes.INTEGER ) );
        registerFunction( "utc_timestamp", new StandardSQLFunction( "utc_timestamp", StandardBasicTypes.INTEGER ) );
        registerFunction( "weekofyear", new StandardSQLFunction( "weekofyear", StandardBasicTypes.INTEGER ) );
        registerFunction( "weeks_add", new StandardSQLFunction( "weeks_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "weeks_sub", new StandardSQLFunction( "weeks_sub", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "year", new StandardSQLFunction( "year", StandardBasicTypes.INTEGER ) );
        registerFunction( "years_add", new StandardSQLFunction( "years_add", StandardBasicTypes.TIMESTAMP ) );
        registerFunction( "years_sub", new StandardSQLFunction( "years_sub", StandardBasicTypes.TIMESTAMP ) );

        // Impala Conditional Functions
        registerFunction( "coalesce", new StandardSQLFunction( "coalesce" ) );
        registerFunction( "decode", new StandardSQLFunction( "decode" ) );
        registerFunction( "if", new StandardSQLFunction( "if" ) );
        registerFunction( "ifnull", new StandardSQLFunction( "ifnull" ) );
        registerFunction( "isfalse", new StandardSQLFunction( "isfalse", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "nullvalue", new StandardSQLFunction( "nullvalue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "nonnullvalue", new StandardSQLFunction( "nonnullvalue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "istrue", new StandardSQLFunction( "istrue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "isnotfalse", new StandardSQLFunction( "isnotfalse", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "isnottrue", new StandardSQLFunction( "isnottrue", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "zeroifnull", new StandardSQLFunction( "zeroifnull" ) );
        registerFunction( "nvl2", new StandardSQLFunction( "nvl2" ) );
        registerFunction( "nvl", new StandardSQLFunction( "nvl" ) );
        registerFunction( "nullifzero", new StandardSQLFunction( "nullifzero" ) );
        registerFunction( "nullif", new StandardSQLFunction( "nullif" ) );
        registerFunction( "isnull", new StandardSQLFunction( "isnull" ) );

        // Impala String Functions
        registerFunction( "ascii", new StandardSQLFunction( "ascii", StandardBasicTypes.INTEGER ) );
        registerFunction( "base64encode", new StandardSQLFunction( "base64encode", StandardBasicTypes.STRING ) );
        registerFunction( "base64decode", new StandardSQLFunction( "base64decode", StandardBasicTypes.STRING ) );
        registerFunction( "left", new StandardSQLFunction( "left", StandardBasicTypes.STRING ) );
        registerFunction( "initcap", new StandardSQLFunction( "initcap", StandardBasicTypes.STRING ) );
        registerFunction( "group_concat", new StandardSQLFunction( "group_concat", StandardBasicTypes.STRING ) );
        registerFunction( "concat_ws", new StandardSQLFunction( "concat_ws", StandardBasicTypes.STRING ) );
        registerFunction( "concat", new StandardSQLFunction( "concat", StandardBasicTypes.STRING ) );
        registerFunction( "chr", new StandardSQLFunction( "chr", StandardBasicTypes.STRING ) );
        registerFunction( "btrim", new StandardSQLFunction( "btrim", StandardBasicTypes.STRING ) );
        registerFunction( "instr", new StandardSQLFunction( "instr", StandardBasicTypes.INTEGER ) );
        registerFunction( "find_in_set", new StandardSQLFunction( "find_in_set", StandardBasicTypes.INTEGER ) );
        registerFunction( "char_length", new StandardSQLFunction( "char_length", StandardBasicTypes.INTEGER ) );
        registerFunction( "levenshtein", new StandardSQLFunction( "levenshtein", StandardBasicTypes.INTEGER ) );
        registerFunction( "length", new StandardSQLFunction( "length", StandardBasicTypes.INTEGER ) );
        registerFunction( "locate", new StandardSQLFunction( "locate", StandardBasicTypes.INTEGER ) );
        registerFunction( "lcase", new StandardSQLFunction( "lcase", StandardBasicTypes.STRING ) );
        registerFunction( "lower", new StandardSQLFunction( "lower", StandardBasicTypes.STRING ) );
        registerFunction( "right", new StandardSQLFunction( "right", StandardBasicTypes.STRING ) );
        registerFunction( "reverse", new StandardSQLFunction( "reverse", StandardBasicTypes.STRING ) );
        registerFunction( "replace", new StandardSQLFunction( "replace", StandardBasicTypes.STRING ) );
        registerFunction( "repeat", new StandardSQLFunction( "repeat", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_replace", new StandardSQLFunction( "regexp_replace", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_extract", new StandardSQLFunction( "regexp_extract", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_escape", new StandardSQLFunction( "regexp_escape", StandardBasicTypes.STRING ) );
        registerFunction( "parse_url", new StandardSQLFunction( "parse_url", StandardBasicTypes.STRING ) );
        registerFunction( "ltrim", new StandardSQLFunction( "ltrim", StandardBasicTypes.STRING ) );
        registerFunction( "lpad", new StandardSQLFunction( "lpad", StandardBasicTypes.STRING ) );
        registerFunction( "regexp_like", new StandardSQLFunction( "regexp_like", StandardBasicTypes.BOOLEAN ) );
        registerFunction( "ucase", new StandardSQLFunction( "ucase", StandardBasicTypes.STRING ) );
        registerFunction( "upper", new StandardSQLFunction( "upper", StandardBasicTypes.STRING ) );
        registerFunction( "trim", new StandardSQLFunction( "trim", StandardBasicTypes.STRING ) );
        registerFunction( "translate", new StandardSQLFunction( "translate", StandardBasicTypes.STRING ) );
        registerFunction( "substring", new StandardSQLFunction( "substring", StandardBasicTypes.STRING ) );
        registerFunction( "substr", new StandardSQLFunction( "substr", StandardBasicTypes.STRING ) );
        registerFunction( "strright", new StandardSQLFunction( "strright", StandardBasicTypes.STRING ) );
        registerFunction( "strleft", new StandardSQLFunction( "strleft", StandardBasicTypes.STRING ) );
        registerFunction( "split_part", new StandardSQLFunction( "split_part", StandardBasicTypes.STRING ) );
        registerFunction( "space", new StandardSQLFunction( "space", StandardBasicTypes.STRING ) );
        registerFunction( "rtrim", new StandardSQLFunction( "rtrim", StandardBasicTypes.STRING ) );
        registerFunction( "rpad", new StandardSQLFunction( "rpad", StandardBasicTypes.STRING ) );

        // Impala Miscellaneous Functions
        registerFunction( "effective_user", new StandardSQLFunction( "effective_user", StandardBasicTypes.STRING ) );
        registerFunction( "current_database", new StandardSQLFunction( "current_database", StandardBasicTypes.STRING ) );
        registerFunction( "version", new StandardSQLFunction( "version", StandardBasicTypes.STRING ) );
        registerFunction( "uuid", new StandardSQLFunction( "uuid", StandardBasicTypes.STRING ) );
        registerFunction( "user", new StandardSQLFunction( "user", StandardBasicTypes.STRING ) );
        registerFunction( "sleep", new StandardSQLFunction( "sleep", StandardBasicTypes.STRING ) );
        registerFunction( "logged_in_user", new StandardSQLFunction( "logged_in_user", StandardBasicTypes.STRING ) );
        registerFunction( "pid", new StandardSQLFunction( "pid", StandardBasicTypes.INTEGER ) );

        // Impala Aggregate Functions
        registerFunction( "min", new StandardSQLFunction( "min" ) );
        registerFunction( "max", new StandardSQLFunction( "max" ) );
        registerFunction( "appx_median", new StandardSQLFunction( "appx_median" ) );
        registerFunction( "sum", new StandardSQLFunction( "sum", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev_samp", new StandardSQLFunction( "stddev_samp", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev_pop", new StandardSQLFunction( "stddev_pop", StandardBasicTypes.DOUBLE ) );
        registerFunction( "stddev", new StandardSQLFunction( "stddev", StandardBasicTypes.DOUBLE ) );
        registerFunction( "ndv", new StandardSQLFunction( "ndv", StandardBasicTypes.DOUBLE ) );
        registerFunction( "avg", new StandardSQLFunction( "avg", StandardBasicTypes.DOUBLE ) );
        registerFunction( "count", new StandardSQLFunction( "count", StandardBasicTypes.BIG_INTEGER ) );
        registerFunction( "group_concat", new StandardSQLFunction( "group_concat", StandardBasicTypes.STRING ) );

        getDefaultProperties().setProperty( Environment.STATEMENT_BATCH_SIZE, DEFAULT_BATCH_SIZE );

        limitHandler = new ImpalaLimitHandler();
    }

    @Override
    public String getAddColumnString() {
        return "add column";
    }

    @Override
    public boolean supportsLockTimeouts() {
        return false;
    }

    @Override
    public String getForUpdateString() {
        return "";
    }

    @Override
    public LimitHandler getLimitHandler() {
        return limitHandler;
    }

    @Override
    public boolean supportsLimit() {
        return true;
    }

    @Override
    public String getLimitString(String sql, boolean hasOffset) {
        return sql + (hasOffset ? " limit ? offset ? " : " limit ?");
    }

    @Override
    public boolean bindLimitParametersFirst() {
        return true;
    }

    @Override
    public boolean supportsIfExistsAfterTableName() {
        return false;
    }

    @Override
    public boolean supportsIfExistsBeforeTableName() {
        return true;
    }

    @Override
    public boolean supportsColumnCheck() {
        return true;
    }

    @Override
    public boolean supportsSequences() {
        return true;
    }

    @Override
    public boolean supportsPooledSequences() {
        return true;
    }

    @Override
    protected String getCreateSequenceString(String sequenceName) {
        return "create sequence " + sequenceName + " start with 1";
    }

    @Override
    protected String getCreateSequenceString(String sequenceName, int initialValue, int incrementSize) throws MappingException {
        if ( supportsPooledSequences() ) {
            return "create sequence " + sequenceName + " start with " + initialValue + " increment by " + incrementSize;
        }
        throw new MappingException( getClass().getName() + " does not support pooled sequences" );
    }

    @Override
    public SequenceInformationExtractor getSequenceInformationExtractor() {
        return SequenceInformationExtractorHSQLDBDatabaseImpl.INSTANCE;
    }

    @Override
    public String getSelectClauseNullString(int sqlType) {
        String literal;
        switch ( sqlType ) {
            case Types.BIGINT:
                literal = "cast(null as bigint)";
                break;
            case Types.BOOLEAN:
                literal = "cast(null as boolean)";
                break;
            case Types.CHAR:
                literal = "cast(null as string)";
                break;
            case Types.DECIMAL:
                literal = "cast(null as decimal)";
                break;
            case Types.DOUBLE:
                literal = "cast(null as double)";
                break;
            case Types.FLOAT:
                literal = "cast(null as float)";
                break;
            case Types.INTEGER:
                literal = "cast(null as int)";
                break;
            case Types.SMALLINT:
                literal = "cast(null as smallint)";
                break;
            case Types.VARCHAR:
                literal = "cast(null as string)";
                break;
            case Types.BLOB:
                literal = "cast(null as string)";
                break;
            case Types.CLOB:
                literal = "cast(null as string)";
                break;
            case Types.TINYINT:
                literal = "cast(null as tinyint)";
                break;
            case Types.TIMESTAMP:
                literal = "cast(null as timestamp)";
                break;
            default:
                literal = "cast(null as string)";
        }
        return literal;
    }

    @Override
    public boolean supportsUnionAll() {
        return true;
    }

    @Override
    public boolean supportsCurrentTimestampSelection() {
        return true;
    }

    @Override
    public boolean isCurrentTimestampSelectStringCallable() {
        return false;
    }

    @Override
    public String getCurrentTimestampSelectString() {
        return "select current_timestamp()";
    }

    @Override
    public String getCurrentTimestampSQLFunctionName() {
        return "current_timestamp";
    }

    @Override
    public boolean supportsCommentOn() {
        return true;
    }

    // Overridden informational metadata ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

    @Override
    public boolean supportsEmptyInList() {
        return false;
    }

    @Override
    public boolean requiresCastingOfParametersInSelectClause() {
        return true;
    }

    @Override
    public boolean doesReadCommittedCauseWritersToBlockReaders() {
        return true;
    }

    @Override
    public boolean doesRepeatableReadCauseReadersToBlockWriters() {
        return true;
    }

    @Override
    public boolean supportsLobValueChangePropogation() {
        return false;
    }

    @Override
    public String toBooleanValueString(boolean bool) {
        return String.valueOf( bool );
    }

    @Override
    public NameQualifierSupport getNameQualifierSupport() {
        return NameQualifierSupport.SCHEMA;
    }

    @Override
    public boolean supportsNamedParameters(DatabaseMetaData databaseMetaData) throws SQLException {
        return false;
    }

    @Override
    public boolean dropConstraints() {
        return false;
    }

    @Override
    public String getCascadeConstraintsString() {
        return " CASCADE ";
    }
}

 

4. Resolver 등록

 

이제 Custom Database, Dialect, Resolver는 모두 만들었습니다.

그럼 이 클래스들이 hibernate가 사용할 수 있도록 등록해주면 됩니다.

등록하는 코드는 아래와 같습니다.

Properties properties = new Properties();
properties.setProperty("hibernate.dialect_resolvers", "CustomResolver 경로");
localContainerEntityManagerFactoryBean.setJpaProperties(properties);

 

4. 마무리

이번 포스팅에서는 조회 용도의 Impala-Hibernate 조합을 알아보았습니다.

 

많지는 않겠지만 혹시 사용이 필요한 사람들을 위해 코드를 공유합니다.

 

감사합니다.

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Mybatis  (0) 2021.02.24
Impala  (1) 2021.01.02
반응형

1. 서론

이번 포스팅에서는 앱단에서 mybatis 를 통해 Impala를 사용하는 과정에서 생긴 문제점을 공유하려 합니다.

 

2. 문제점

impala 는 hadoop에 있는 다양한 데이터( = hdfs, hbase, kudu, etc) 들을 SQL을 통해 간편하고 빠르게 조회 및 처리할 수 있도록 도와주는 인터페이스입니다.

 

mybatis는 SQL 매퍼로서 앱단에서 dynamic query를 가능하게 해주며,

xml을 이용하여 SQL과 java와 같은 프로그래밍언어를 분리하도록 도와주는 라이브러리입니다.

 

impala와 mybatis 를 같이 사용하면 간편한 코드작성으로 impala를 통해 hadoop 데이터를 핸들링 할 수 있습니다.

 

하지만, 최신 impala와 mybatis 버전에는 문제점이 하나 있습니다.

바로, java의 LocalDateTime 필드 처리입니다.

 

먼저, mybatis를 이용하여 impala 데이터를 처리하는 과정을 설명하겠습니다.

 

1) LocalDateTimeTypeHandler

 

mybatis는 vo class의 필드를 보고, LocalDateTime 필드의 경우 LocalDateTimeTypeHandler 를 호출하게 됩니다.

public class LocalDateTimeTypeHandler extends BaseTypeHandler<LocalDateTime> {

  @Override
  public void setNonNullParameter(PreparedStatement ps, int i, LocalDateTime parameter, JdbcType jdbcType)
          throws SQLException {
    ps.setObject(i, parameter);
  }

  @Override
  public LocalDateTime getNullableResult(ResultSet rs, String columnName) throws SQLException {
    return rs.getObject(columnName, LocalDateTime.class); // 문제 부분입니다.
  }

  @Override
  public LocalDateTime getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
    return rs.getObject(columnIndex, LocalDateTime.class);
  }

  @Override
  public LocalDateTime getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
    return cs.getObject(columnIndex, LocalDateTime.class);
  }
}

 

코드에서 보이는것과 같이 LocalDateTimeTypeHandler 에서는 ResultSet의 getObject를 사용합니다.

여기서 이 ResultSet은 jdbc 라이브러리에 있는 인터페이스이며, impala는 cloudera 측에서 제공하는 impalaJdbc 라이브러리가 있습니다.

 

2) S41ForwardResultSet

 

cloudera에서 제공하는 jdbc 라이브러리의 ResultSet 구현 클래스입니다.

클래스에는 getObject 메서드가 아래와 같이 구현 되어있고, Util 클래스인 ResultSetUtilities 의 getObjectByType 메서드를 호출하는것을 볼 수 있습니다.

 

public <T> T getObject(int var1, Class<T> var2) throws SQLException {
    try {
        LogUtilities.logFunctionEntrance(this.m_logger, new Object[]{var1, var2});
        this.checkIfOpen();
        return ResultSetUtilities.getObjectByType(this, var1, var2);
    } catch (Exception var4) {
        throw ExceptionConverter.getInstance().toSQLException(var4, this.getWarningListener(), this.getLogger());
    }
}

 

3) ResultSetUtilities

 

아래는 getObjectByType 의 일부 코드입니다.

일부 코드만 봐도 알듯이, mybatis에서 전달한 Class타입을 가지고 알맞게 캐스팅하여 반환하는 로직입니다.

 

하지만, 이 if-else 문에는 아쉽게도 LocalDateTime.class 로 비교하는 구문이 없어 에러가 나게 됩니다.

if (var2.equals(String.class)) {
    return var0.getString(var1);
} else if (var2.equals(Time.class)) {
    return var0.getTime(var1);
} else if (var2.equals(Timestamp.class)) {
    return var0.getTimestamp(var1);

 

반응형

 

3. 해결법

1) cloudera 측 문의

 

소스 한줄추가하면 가능한거라 메일로 문의를 드렸습니다.

하지만, 외국 기업인것도 있으며 오픈소스가 아니기에 제공하는 jdbc에 대한 클라이언트 요청을 전문적으로 받아 픽스하려면

유료 사용을 검토해야만 했습니다.

 

하지만..... 단순히 한줄이면 되었기에 이대로 포기할 수 없었습니다.

 jdbc 라이브러리가 안되면 mybatis handler를 수정하기로 마음먹었습니다.

 

2) mybatis custom handler 등록

 

다행히도, mybatis에는 custom handler를 등록할 수 있도록 제공하고 있었습니다.

 

소스는 아래와 같이, sqlSesqlSessionFactoryBean 에 커스텀 핸들러를 등록하는겁니다.

sqlSessionFactoryBean.setTypeHandlersPackage("커스텀 핸들러 패키지 경로");

 

핸들러는 아래와 같이 등록하였습니다.

 

package 커스텀 핸들러 패키지 경로;

/**
 * Created by geonyeong.kim on 2021-02-24
 */
public class LocalDateTimeTypeHandler extends BaseTypeHandler<LocalDateTime> {

    @Override
    public void setNonNullParameter(PreparedStatement ps, int i, LocalDateTime parameter, JdbcType jdbcType)
            throws SQLException {
        ps.setTimestamp(i, Timestamp.valueOf(parameter));
    }

    @Override
    public LocalDateTime getNullableResult(ResultSet rs, String columnName) throws SQLException {
        Timestamp timestamp = rs.getTimestamp(columnName);
        return getLocalDateTime(timestamp);
    }

    @Override
    public LocalDateTime getNullableResult(ResultSet rs, int columnIndex) throws SQLException {
        Timestamp timestamp = rs.getTimestamp(columnIndex);
        return getLocalDateTime(timestamp);
    }

    @Override
    public LocalDateTime getNullableResult(CallableStatement cs, int columnIndex) throws SQLException {
        Timestamp timestamp = cs.getTimestamp(columnIndex);
        return getLocalDateTime(timestamp);
    }

    private static LocalDateTime getLocalDateTime(Timestamp timestamp) {
        if (timestamp != null) {
            return timestamp.toLocalDateTime();
        }
        return null;
    }
}

 

사실, 이 핸들러 코드는 mybatis 마이너버전의 소스입니다.

하지만, LocalDateTime 하나때문에 마이너한 mybatis 버전을 메이저로 올리지 못했기에 mybatis 버전은 메이저로, 문제가 되는 handler는 커스텀하여 해결하였습니다.

 

4. 마무리

apache impala 에서는 timestamp 컬럼을  java 언어에서 사용할 시 LocalDateTime을 사용하라고 권장하고 있습니다.

다만, jdbc에서는 아직 지원이 되지 않아 겪은 문제입니다.

 

하루빨리 jdbc 에도 LocalDateTime 지원이 되어 편하게 개발하는 날이 오길 기대하고 있습니다.

 

감사합니다.

 

 

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Hibernate  (1) 2021.02.24
Impala  (1) 2021.01.02
반응형

1. 서론

이번 포스팅에서는 Apache Impala에 대해 소개하려고 합니다.

 

Cloudera 도큐먼트를 보며 개인적으로 6.x 버전부터 Impala 와 Kudu 조합을 추천하고 있다고 느꼈습니다.
때문에, Hadoop에 관심이 있으신 분이라면 Impala와 Kudu에 대해 알아보는 것을 추천합니다.

물론, Impala는 Kudu가 아니더라도 File에 대해서도 SQL을 수행할 수 있는 서비스입니다.

 

2. Impala란?

임팔라는 Hadoop Eco에 저장되어 있는 데이터를 실시간 병렬 조회가 가능한 SQL 쿼리 엔진입니다.

 

Impala는 아래와 같이 병렬성을 가질수 있는 아키텍처로 이루어져 있습니다.

 

 

 

1) Impalad

 

Impalad는 impala daemon으로 실제 쿼리를 수행하는 역할을 담당합니다.

 

Impalad는 그림과 같이 Query Compiler, Query Coordinator, Query Executor로 이루어져 있습니다.

 

Compiler는 말그대로 client에서 요청받은 SQL 질의를 Compile 한 후 어떻게 처리할지 결정합니다.

Coordinator는 Compiler가 전달한 요청서를 받아 각각 다른 호스트에 있는 Impalad의 Executor에 요청합니다.

이는 데이터의 locality를 보장하며 이로인해 분산 질의 처리가 가능해지며 성능이 향상됩니다.

 

마지막으로 Executor는 실제로 자신이 담당하는 데이터에 대해서 SQL 질의를 수행하는 역할을 합니다.

질의 수행이 끝난 Executor는 Coordinator에게 결과를 반환합니다.

 

 

여기서, 눈치 채셨겠지만 각 Impalad는 metadata를 가지고 있습니다.

metadata는 필요한 데이터가 어느 호스트(노드)에 있는지에 대한 mapping 정보가 담겨져 있습니다.

 

Impalad가 metadata를 가지고 있기 때문에, Client의 요청은 모든 Impalad가 수신 할 수 있습니다.

 

 

2) StateStore

 

StateStore는 각 Impalad의 상태를 관리하며 데이터 일관성을 위해 Metadata의 변경 사항을 모든 Impalad에 브로드캐스팅하는 역할을 담당합니다.

 

StateStore는 Impala 서비스에서 없다고 동작이 안하진 않습니다.

다만, 클러스터 내에서 Impalad가 제외된 경우 쿼리 가능한 daemon 그룹에서 제외를 못하며, metadata 갱신이 안되는 경우가 발생할 수 있습니다.

 

때문에, 사실상 운영시 StateStore도 띄어야 합니다.

 

3) Catalog Service

 

Catalog Service는 Impala 쿼리로 데이터의 CUD 혹은 DDL 작업 시,

이를 impalad의 metadata에 반영하기 위해 StateStore에 브로드캐스팅해달라고 요청하는 역할을 담당합니다.

브로드캐스팅은 StateStore를 통해서 수행되므로 Catalog Service와 StateStore는 같은 호스트에서 수행하는 것이 좀 더 효율적입니다.

그림에서와 같이 Catalog Service도 metadata가 있으며, 이 metadata가 원본이라고 생각하시면 됩니다.

 

3. Impala 사용시 주의점

Impala는 HMS( Hive MetaStore ) 가 떠있어야 가능합니다.

 

그 이유는 Catalog Service가 가지고 있는 Metadate 관리는 실제로 HMS에서 관리하는 데이터를 복사한것이기 때문입니다.

 

때문에, Hive와 Impala를 같이 사용시 주의할 점이 생기게 됩니다.

 

주의할 점은 HiveQL로 인한 metadata 변경은 impala에서 알 수 없다는 점입니다.

HiveQL로 변경된 데이터는 HMS에는 반영되나 impala에서 원본 metadata를 관리하는 Catalog Service에는 반영이 안되기 때문입니다.

 

하지만, 위에서 말씀드린것처럼 Catalog Service의 metadata 은 HMS에서 Copy한 것입니다.

그러니, HiveQL로 변경된 부분을 Impala metadata에도 반영하고 싶은 경우에는 HMS에서 Copy를 다시 하면 됩니다.

 

다시 변경 부분만을 Copy 해와 Impalad에 브로드캐스팅할 수 있도록 Impala는 REFRESH, INVALIDATE METADATA 쿼리를 제공합니다.

 

그럼, Hive에서도 Impala 쿼리로 인한 변경사항을 반영해야하는 문제가 있을까요?

결국 Metadata의 원본은 HMS이며, Impala에서 변경된 부분은 HMS에도 반영됩니다.

또한, Hive는 매 질의시 HMS에서 Metadata를 조회해 처리하기 때문에 Hive는 REFRESH,INVALIDATE METADATA 와 같은 작업을 하지 않아도 됩니다.

 

반응형

 

4. Hive 와 Impala 차이점

Hive와 Impala의 차이점은 동작의 차이로 인해 발생합니다.

Hive는 SQL을 통해 편하게 MR을 수행하는 서비스입니다.

Impala는 MR로 동작하지 않습니다. 위에서 소개한것과 같이 Impala는 MR이 아닌 SQL 엔진을 통해 동작합니다.

 

이로인해, Impala는 Hive에 비해 처리속도가 빠릅니다.

그렇다고, Hive를 모두 Impala 로 대체하는것은 옳바르진 않습니다.

 

우선, Hive, Impala 모두 OLAP 적합한 서비스입니다. 다만, Impala는 SQL 처리가 메모리상에서 이루어집니다.

때문에, 호스트에 메모리가 부족하거나 메모리를 너무 과도하게 차지하는 쿼리의 경우에는

Hive를 사용하는것이 클러스터에 부담도 주지않으며 좋은 선택일 수 있습니다.

물론, Hive는 MR이기에 Spark로 대체하는 방법도 좋은 방안입니다.
하지만, Spark도 기본은 메모리 연산이기에 각 요구사항에 맞게 선택하여 사용해야 합니다.

 

또한, Hive, Impala 는 OLAP에 적합한 서비스라고 했습니다. 하지만 이는 Hdfs 파일에 대해서 쿼리를 날리는 경우입니다.

 

Impala 를 SQL 인터페이스로 사용하지만 실제 데이터는 Kudu 혹은 Hbase에 있다면, 쿼리는 Kudu, Hbase 특성에 맞게 작성해야합니다.

예를들어, Kudu의 경우에는 PK에 대해서 B+Tree로 index를 가지고 있기 때문에 Where 절에 PK 넣는 쿼리의 경우에는 데이터가 대용량이 아니더라도 응답속도가 빠르게 나오게 됩니다.

impalad 가 Kudu Master 혹은 Hbase Master에 요청하여 데이터를 가져오기 때문입니다.
이런 경우에는 metadata도 사실상 무의미하며 해당 데이터를 담당하는 Kudu, Hbase 영역으로 넘어가게 됩니다.

하지만, 이런 저장소에서 제공하는 별도의 클라이언트를 사용하지 않고 Impala 를 통해 간편하게 SQL로 데이터를 Handling할 수 있다는 점이 Impala의 장점입니다.

 

5. HAProxy

Client의 요청은 모든 Impalad가 수신 할 수 있다고 했습니다.

때문에, 관리자 입장에서는 하나의 Impalad에만 요청이 쏠리지 않도록 HAProxy를 구축해야 합니다.

 

HAProxy를 구축하면 얻는 이점은 아래와 같습니다.

 

  1. 부하 분산
  2. Client의 요청을 단일 endpoint 로 관리 가능
    1. Impalad가 추가된다면 HAProxy에 추가된 Impalad 호스트만 추가하면 됩니다.
Cloudera Impala Document 에서는 무료 오픈소스인 haproxy 를 소개하고 있습니다.

 

6. 마무리

이번 포스팅에서는 Impala에 대해 알아봤습니다.

 

부족한 설명이였지만 읽어주셔서 감사합니다.

반응형

'BigData > Impala' 카테고리의 다른 글

Impala - Hibernate  (1) 2021.02.24
Impala - Mybatis  (0) 2021.02.24
반응형

1. 서론

이번 포스팅에서는 Apache Hadoop Eco 저장소 중 하나인 Kudu 에 대해 소개하려고 합니다.

개인적으로 최근 Kudu 를 사용하는 회사들이 늘어나는거 같습니다.
아무래도, cloudera 측에서 impala와의 연계 저장소로 추천하고 있다는 점이 크다고 봅니다.

 

소개 목차로는 아래와 같습니다.

 

  • Kudu란?
  • Hbase와의 차이점
  • Kudu 지원 & 미지원

2. Kudu란?

Kudu는 Hadoop Eco 저장소 중 하나이며, Columnar Storage 입니다.

Columnar Storage인 이유는 Mongo, Hbase와 같이 schemaless 여서가 아니며, 물리적으로 Column 별로 파일에 저장하기 때문입니다.

 

Kudu 는 분산 플랫폼으로 아래와 같은 아키텍처를 가지고 있습니다.

 

 

1) Tablet

 

Kudu에서 테이블은 파티셔닝 테이블입니다.

파티셔닝 테이블이란 특정 column을 기준으로 데이터를 나눠 저장한다는 의미입니다.

 

바로 이 파티셔닝 테이블에서 하나의 파티셔닝이 Tablet 입니다.

데이터를 샤딩하는것과 같으며, 하나의 샤딩이라고 보시면 됩니다.

 

이 Tablet은 Leader, Follower로 이루어져 있습니다.

모든 Write 요청은 Leader가 받으며, Read 요청은 Leader, Follower 모두 받습니다.

 

Leader, Follower에서 Read가 가능하게 하기 위해 Write의 동작방식은 아래와 같이 이루어지게 됩니다.

 

 

 

 

데이터 유실을 방지하기 위해 가장 먼저 Leader의 WAL에 쓰기 작업을 하며,

그 이후는 각 Follwer에게 쓰기를 요청하여 모두 성공한 경우 클라이언트에게 성공 응답을 보내게 됩니다.

개인적으로 이런 부분은 분산 플랫폼 중 Kafka와 유사하다고 볼 수 있습니다.

 

2) Tablet Server

 

Tablet Server는 Tablet 을 가지고 있는 서버를 말하며, 여러개의 Tablet 을 가지고 있습니다.

 

Tablet Server는 크게 Master 서버와 Tablet 서버로 나누어지며,

각 역할은 Hdfs의 Name 노드와 Data 노드와 같이 Tablet의 메타데이터는 Master Server, 실제 데이터는 Tablet Server에 있습니다.

Master 서버의 메타데이터 또한 Leader, Follwer 구조로 이루어져 있습니다.

 

3) MRS

 

MRS는 MemRowSet으로 WAL에 데이터 Write 후 써지는 저장소입니다.

메모리로 이루어져 있으며 B+ tree로 구성되어져 있습니다.

 

아래는 MRS 그림입니다.

 

 

 

4) DRS

 

MRS가 일정 시간 혹은 크기가 넘어가게 되면 Disk에 Write를 하게됩니다.

바로 Write하는 곳이 DiskRowSet이며 DRS 라고 합니다.

 

5) DeltaMemeStore

 

DeltaMemStore는 Update 요청으로 데이터 변경분을 주기적으로 Redo 파일에 Write하는 메모리 영역입니다.

 

Kudu는 MRS, DRS, DeltaMemStore와 같이 중간에 메모리와 디스크를 통해 영구적인 데이터 보존과 함께, 속도까지 겸비한것을 알 수 있습니다.

 

하지만, 조회를 하는 클라이언트 입장에서는 MRS, DRS, RedoFile 등등 조회할 부분이 많습니다.

이를 위해, Kudu는 compaction 작업을 통해 조회시 접근할 메모리 영역과 디스크 영역을 최소화합니다.

 

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

 

3. Hbase와의 차이점

 

Kudu와 Hbase는 비슷한 점이 많습니다.

비슷한 부분은 Tablet <-> Region, Tablet Server <-> Region Server, Columnar Storage 등이 있습니다.

 

하지만 apache 깃헙을 가보시면 Hbase, Kudu는 별도의 프로젝트이며, 엄연히 사용처가 다른 것을 짐작할 수 있습니다.

 

Hbase와 Kudu의 가장 큰 차이점은 자료구조입니다.

Hbase는 LSM인 Log Structured Merge 방식을 사용하며, Kudu는 B+tree 방식의 MRS, DRS를 사용합니다.

 

이로인해, key를 통해 read하는 경우 Kudu가 Hbase에 비해 속도가 우수합니다.

물론, Hbase의 경우에도 Read 성능을 올리기위해 저장파일인 HFile이 Multi-layerd index 방식으로 이루어져 있습니다. 

 

두번째 차이점으로는 저장 파일단위입니다.

Hbase는 HFile, Kudu는 CFile로 데이터를 저장합니다.

 

이로인해, table의 컬럼별 집계와 같은 aggregation 쿼리의 경우 Kudu가 File IO가 적게 들어 우수한 성능이 나옵니다.

 

이러한 차이점으로 인해 Kudu가 우수한 부분도 있지만 Hbase가 우수한 부분도 있습니다.

 

첫째 Write 성능

 

Hbase는 MemStore, LSM 의 방식으로 인해 Write의 경우 Kudu보다 성능이 좀 더 우수합니다.

 

두번째 Scan 성능

 

Hbase의 경우 내부적으로 SortedMap 과 같이 정렬하여 데이터를 저장하고 있습니다.

이는, Scan 연산시 kudu에 비해 유리하게 작동할 수 있습니다.

 

세번째 컬럼별 버저닝

 

Hbase는 컬럼별 버저닝을 지원하고 있습니다. 이는 kudu에 비해 우수한 부분이기 보단 차이점으로,

이력 데이터를 관리하기에 용이한 저장소입니다.

 

4. Kudu 지원 & 미지원

kudu는 현재 지원되는 부분과 지원되지 않는 부분이 있습니다.

 

지원되는 부분으로는 아래와 같습니다.

 

  1. compound primary key
  2. single row transaction

미지원 항목은 아래와 같습니다.

 

  1. secondary indexes
  2. multi row transaction
  3. TTL(Time-to-Live)

 

5. 마무리

이번 포스팅에서는 Kudu에 대해 알아봤습니다.

 

Kudu가 Hbase의 대체제는 아니며

서비스 특성에 따라 Kudu 가 잘 맞을수도, Hbase가 잘 맞을수도 있습니다.

 

좋은 서비스를 위해서는 각 저장소의 특징을 알고, 적절하게 선택하여 사용해야 합니다.

 

감사합니다.

반응형
반응형

1. 서론

안녕하세요.

 

저희 조직은 DB cdc를 kafka 로 흘려주며 데이터의 스키마는 schema registry를 사용하여 관리하고 있습니다.

 

최근 spring kafka + schema registry + gradle plugin 를 사용하여 application 에서 cdc 데이터를 consume 하는 작업을 하였는데

다른 분들에게 공유하기에 좋다고 생각들어 포스팅하였습니다.ㅎㅎ

 

2. gradle plugin

schema registry 를 사용하여 kafka record를 consume하는 과정은 아래 절차로 이루어져야 합니다.

 

  1. schema registry 를 통해 avro 스키마 DOWNLOAD
  2. DOWNLOAD 한 avro 스키마를 기반으로 java class 생성
  3. 생성된 java class 를 kafka consume record의 모델로 사용

이때, 1~2번의 과정은 빌드 도구인 gradle에서 제공하는 plugin이 있습니다.

 

1) com.github.imflog:kafka-schema-registry-gradle-plugin

 

schema registry에서 avsc file을 다운로드 하는 plugin 입니다.

 

plugin 적용 방법은 아래와 같습니다.

 

1. repositories 추가

 

build.gradle 파일에 repository를 등록해주세요.

 

 

2. dependencies classpath 추가

 

repository를 등록했으니 필요한 dependencies classpath를 추가합니다.

 

 

이때, 버전 호환이 중요합니다.

 

kafka-schema-registry-gradle-plugin 과 gradle-avro-plugin 는 모두 org.apache.avro:avro를 참조하고 있습니다.

 

org.apache.avro:avro 의 1.8.x 버전은 bug 도 존재할 뿐더러 time 관련된 부분을 java.utile.time 이 아닌 joda를 사용하고 있는데요.

때문에, 가능하다면schema-registry-plugin:5.5.1 , gradle-avro-plugin:0.21.0 의 조합으로 사용하는것을 권장합니다.

권장하는 조합의 plugin 버전은 org.apache.avro:avro 1.10.x 를 사용하며
miner 버전 조합인 schema-registry-plugin:5.3.4 , gradle-avro-plugin:0.16.0 은 org.apache.avro:avro 1.8.x를 사용하고 있습니다.

 

3. gradle task 추가

 

아래와 같이 plugin은 apply 하게 되면 2번째 사진과 같이 schemaRegistry 구문을 사용할 수 있습니다.

download.subject는 subjectName, download path 로 이해하시면 됩니다.

저의 경우 consume만 하므로 download task만 적용하였습니다.

 

 

plugin에 대한 자세한 내용은 여기를 참고해주세요.

 

 

2) com.commercehub.gradle.plugin:gradle-avro-plugin

 

avsc file 을 사용하여 java class를 생성해주는 plugin 입니다.

 

plugin 적용 방법은 아래와 같습니다.

 

1. repository 및 dependencies 추가

 

buildscript { 
	repositories { 
    	jcenter() 
    } 
    dependencies { 
    	classpath "com.commercehub.gradle.plugin:gradle-avro-plugin:VERSION" 
    } 
}

 

2. gradle plugin apply 추가

 

apply plugin: "com.commercehub.gradle.plugin.avro-base"

 

위 1~2번까지 완료하신 후 gradle sync를 한번 해주신다면 generateAvro라는 task 를 사용할 수 있습니다.

 

해당 task 를 수행하게 되면 avsc 파일을 사용하여 java class를 생성하게 됩니다.

avro file path, gerate java class path 같은 부분은 gradle 파일에서 custom 가능합니다.

 

 

반응형

 

 

3. 적용 시 겪은 문제점

저의 경우 2개의 plugin 모두 적용 후 실제 app에서 kafka record를 가져가는데 문제가 있었습니다.

 

1) stringType

 

문제에 대해 설명하기 전 avro의 string type은 3가지를 지원하고 있습니다.

Utf8의 경우에는 python과 같은 java 뿐만이 아닌 언어의 string type까지 지원하기 위해서 인것 같습니다.
  • CharSequence (java.lang.CharSequence)
  • String (java.lang.String)
  • Utf8 (org.apache.avro.util.Utf8)
기본은 String 이며 아래와 같이 변경도 가능합니다.

 

문제는 com.commercehub.gradle.plugin:gradle-avro-plugin 를 통해 생성한 java class에 kafka record를 setting 할때 발생했습니다.

 

kafka record를 java class에 세팅하는 작업은 plugin을 통해 만든 java class의 put method를 통해 이루어 집니다.

 

put 메서드는 아래와 같이 type casting을 (java.lang.String) 으로 하게 되는데요.

제가 consume할 kafka record는 python code 로 publishing을 하고 있어 type casting exception이 발생하게 됩니다.

 

 

하지만, 이 문제는 다행히 plugin version up으로 해결되었습니다.

 

최신 version인 gradle-avro-plugin:0.21.0 를 사용하면 아래와 같이 put 메서드가 만들어집니다.

 

.toString으로 type casting 하도록 수정되어 더이상 type casting 예외는 발생하지 않게됩니다.

 

 

2) java.time.Instant

 

저희는 kafka record에서 timestamp 관련 데이터는 long type의 logicalType은 timestamp-millis로 정의해서 사용하고 있습니다.

 

이 경우, put method는 아래와 같이 만들어집니다.

 

 

type이 long인데 , java.time.Instant 타입을 사용하는게 의아해 할 수 있는데요.

이유는 logicalType으로 timestamp-millis 를 지정했기 때문입니다.

참고 - TimeConversions

 

하지만, 실제로 데이터를 consume 과정에서는 아래 예외가 발생하게 됩니다.

java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.time.Instant 

 

이유는, 역직렬화시 long type의 데이터를 logicalType 명시에 맞도록 casting 되지 않아서 입니다.

 

이 문제는, kafka consumer의 설정으로 해결할 수 있었습니다.

kafka + schema registry 연계를 많이 사용하고 있어 kafka 측에서는 consumer에 아래와 같은 옵션을 제공합니다.

 

 

기본은 false이며, avro schema를 사용하여 consume 하는 경우에는 해당 옵션을 true로 변경하여 사용하면 됩니다.

 

4. 마무리

이번 포스팅은 spring kafka + schema registry + gradle plugin 적용에 대해서 진행했습니다.

 

모두, 저같이 삽질을 하지 않았으면 좋겠다는 마음으로 작성하게 되었는데요.

도움이 되었으면 좋겠네요ㅎㅎ

 

감사합니다.

반응형

'MQ > Kafka' 카테고리의 다른 글

(7) Kafka Trouble Shooting  (0) 2021.11.19
(5) 카프카 사용 시 고려사항  (0) 2020.02.26
(4) 카프카 매니저 & 스키마 레지스트리  (0) 2020.02.26
(3) 카프카 사용 예제  (0) 2020.02.25
(2) 카프카 설치  (0) 2020.02.25
반응형

1. 서론

이번 포스팅에서는 6장인 스프링 클라우드와 주울로 서비스 라우팅에 대해 알아보도록 하겠습니다.

 

2. 서비스 게이트웨이란?

서비스 게이트웨이는 서비스 클라이언트와 호출될 서비스 사이에서 중개 역할을 하며,

어플리케이션 안의 마이크로 서비스 호출로 유입되는 모든 트래픽에 대해 게이트키퍼 역할을 합니다.

 

아래는 서비스 게이트웨이를 적용한 일반적인 호출 그림입니다.

 

그림을 보시는것과 같이 서비스 게이트웨이는 중앙 집중식 정책 시행 지점의 역할로,

아래와 같은 이점을 얻을 수 있습니다.

 

  1. 정적 라우팅 : 단일 서비스 URL과 API 경로로 모든 서비스를 호출하게 합니다.
  2. 동적 라우팅 : 서비스 요청 데이터를 기반으로 서비스 호출자 대상에 따라 지능형 라우팅을 수행 할 수 있습니다.
  3. 인증 & 인가 : 호출의 맨 앞단에 있기 때문에, 인증 & 인가를 확인하기 최적의 장소입니다.
  4. 측정 지표 수집 & 로깅 : 서비스 호출에 대한 측정 지표와 로그 수집에 용이합니다.
앞장에서 설명한것과 같이 서비스 게이트웨이 역시 잘못 설계 시 병목점이 될 수 있습니다.
책에서는 서비스 게이트웨이 앞단에 로드 밸런서를 두고, 게이트웨이를 stateless로 하는 것을 권장합니다.

 

3. 스프링 클라우드와 넷플릭스 주울 소개

스프링 클라우드에서는 주울이라는것을 통해 서비스 게이트웨이를 제공합니다.

 

아래는 주울이 제공하는 기능입니다.

 

  1. 어플리케이션의 모든 서비스 경로를 단일 URL로 매핑
  2. 게이트웨이로 유입되는 요청을 검사하고 대응할 수 있는 필터 작성

 

주울서버를 만드는 방법은 아래와 같습니다.

 

 

1) 주울 라이브러리 추가

 

compile 'org.springframework.cloud:spring-cloud-starter-netflix-zuul:2.2.3.RELEASE'

 

2) 주울 서비스를 위한 스프링 클라우드 어노테이션 추가

 

주울 서버로 등록하기 위해 @EnableZuulProxy 어노테이션을 추가합니다.

 

@SpringBootApplication
@EnableZuulProxy
public class ZuulServerApplication {

    public static void main(String[] args) {
        SpringApplication.run(ZuulServerApplication.class, args);
    }
}

 

@EnableZuulServer 도 있으며, 이는 자체 라우팅 서비스를 만들고 내장된 주울 기능을 사용하지 않을 때 사용하는 어노테이션입니다.

 

3) 유레카와 통신하는 주울 구성

 

주울은 스프링 클라우드 제품과 같이 동작하도록 설계되었습니다.

따라서, 자동으로 유레카를 사용해 서비스 ID로 서비스를 찾은 후 넷플릭스 리본으로 주울 내부에서 요청에 대한 클라이언트 측 부하 분산을 수행합니다.

 

아래는 유레카를 사용하도록 application.yml을 수정한 내용입니다.

 

eureka:
  instance:
    preferIpAddress: true
  client:
    registerWithEureka: true
    fetchRegistry: true
    serviceUrl:
      defaultZone: http://localhost:8761/eureka/

 

4. 주울에서 경로 구성

주울은 기본적으로 리버스 프록시입니다.

리버스 프록시는 자원에 접근하려는 클라이언트와 자원 사이에 위치한 중개 서버를 의미합니다.

 

주울은 아래와 같은 프록시 메커니즘을 제공합니다.

 

  1. 서비스 디스커버리를 이용한 자동 경로 매핑
  2. 서비스 디스커버리를 이용한 수동 경로 매핑
  3. 정적 URL을 이용한 수동 경로 매핑

 

1) 서비스 디스커버리를 이용한 자동 경로 매핑

 

주울은 application.yml 을 통해 모든 경로를 매핑하며, 특별한 구성 없이도 서비스 ID를 기반으로 요청을 자동 라우팅 합니다.

 

아래와 같이 서비스의 엔드포인트 경로 첫 부분에 호출하려는 서비스를 표시하여 사용합니다.

 

http://localhost:5555/organizationservice/v1/organizations/~~

 

아래는 유레카와 주울의 조합으로 동작하는 그림입니다.

 

 

 

주울의 매핑 정보를 확인하고 싶은 경우에는 /routes 엔드포인트를 통해 가능합니다.
http://localhost:5555/actuator/routes

 

2) 서비스 디스커버리를 이용한 수동 경로 매핑

 

주울은 유레카 서비스 ID로 자동 생성된 경로에 의존하지 않고 명시적으로 매핑 경로를 정의할 수도 있습니다.

 

아래는 application.yml 을 통해 organizationservice -> organization으로 경로를 수동 매핑한 예제 입니다.

 

zuul:
  ignored-services: 'organizationservice'
  routes:
    organizationservice: /organization/**

 

ignored-services 설정은 자동 생성 된 유레카 서비스 ID 경로를 제외하고 사용자 정의한 경로만 사용할 때 쓰며,

쉼표 구분자로 여러개 기입할 수 있습니다.

 

 

3) 정적 URL을 이용한 수동 경로 매핑

 

주울은 유레카로 관리하지 않는 서비스도 라우팅하도록 기능을 제공합니다.

 

아래는 정적 URL을 수동으로 매핑한 예제입니다.

 

zuul:
  routes:
    licensestatic:
      path: /licensestatic/**
      url: http://licenseservice-static:8081

 

이 설정은 한가지 문제점을 가지고 있습니다.

바로 유레카를 사용하지 않아, 요청할 경로가 하나만 있다는 점입니다.

 

하지만, 리본을 사용하여 클라이언트 부하 분산을 이용할 수 있습니다.

 

아래는 리본을 사용하여 정적 URL을 수동 매핑한 예제입니다.

 

zuul:
  routes:
    licensestatic:
      path: /licensestatic/**
      serviceId: licensestatic
ribbon:
  eureka:
   enabled: false

licensestatic:
  ribbon:
    listOfServers: http://licenseservice-static1:8081, http://licenseservice-static2:8082

 

 

4) 경로 구성을 동적으로 로딩

 

주울은 경로 구성 정보를 동적으로 로딩이 가능합니다.

컨피그 서버에서 살펴본것과 같이 액츄에이터의 /refresh를 호출하는 방법으로 아래와 같이 application.yml 을 구성하면 됩니다.

 

 

5) 주울과 서비스 타임아웃

 

주울은 히스트릭스 타임아웃 프로퍼티를 설정하여, 오래 수행되는 서비스 호출을 차단하여 성능에 악영향을 끼치지 않도록 할 수 있습니다.

 

아래는 히스트릭스 타임아웃 프로퍼티를 설정한 예입니다.

 

 

위 설정의 경우, 모든 서비스에 대한 타임아웃이 2.5초로 설정되어 집니다.

 

만약, 특정 서비스에 대해 별도로 타임아웃을 설정하기 위해서는 아래와 같이 default 부분을 서비스 ID로 재정의하면 됩니다.

 

기본적으로, 주울은 리본 + 유레카를 사용합니다.

여기서 리본은 5초의 디폴트 타임아웃 설정이 있으며, 위 ribbon.ReadTimeout 을 통해 커스텀이 가능합니다.

 

5. 주울의 진정한 힘! 필터

주울은 모든 서비스 호출의 진입점이기 때문에, 호출에 대해 사용자 정의 로직을 작성할 수 있도록 필터를 제공합니다.

 

아래는 제공하는 필터의 종류입니다.

 

  1. 사전 필터 : 목표 대상에 대한 실제 요청이 발생하기 전에 호출되는 필터
  2. 사후 필터 : 서비스를 호출하고 응담을 클라이언트로 전송한 후 호출되는 필터
  3. 경로 필터 : 서비스가 호출되기 전에 가로채는데 사용되는 필터

 

아래는 필터들이 주울에서 어떻게 동작되는지에 대한 그림입니다.

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

6 . 상관관계 ID를 생성하는 주울의 사전 필터 작성

아래는 주울의 사전 필터를 만든 예제 코드입니다.

 

@Component
public class TrackingFilter extends ZuulFilter{
    private static final int      FILTER_ORDER =  1;
    private static final boolean  SHOULD_FILTER=true;
    private static final Logger logger = LoggerFactory.getLogger(TrackingFilter.class);

    @Autowired
    FilterUtils filterUtils;

    @Override
    public String filterType() {
        return FilterUtils.PRE_FILTER_TYPE;
    }

    @Override
    public int filterOrder() {
        return FILTER_ORDER;
    }

    public boolean shouldFilter() {
        return SHOULD_FILTER;
    }

    private boolean isCorrelationIdPresent(){
      if (filterUtils.getCorrelationId() !=null){
          return true;
      }

      return false;
    }

    private String generateCorrelationId(){
        return java.util.UUID.randomUUID().toString();
    }

    public Object run() {

        if (isCorrelationIdPresent()) {
           logger.debug("tmx-correlation-id found in tracking filter: {}. ", filterUtils.getCorrelationId());
        }
        else{
            filterUtils.setCorrelationId(generateCorrelationId());
            logger.debug("tmx-correlation-id generated in tracking filter: {}.", filterUtils.getCorrelationId());
        }

        RequestContext ctx = RequestContext.getCurrentContext();
        logger.debug("Processing incoming request for {}.",  ctx.getRequest().getRequestURI());
        return null;
    }
}

 

여기서, 유의깊게 볼것은 ZuulFilter 입니다.

 

주울은 ZuulFilter를 구현하게 하여 필터를 등록할 수 있게 제공합니다.

 

아래는 ZuulFilter 를 구현하기 위해서는 아래 4개 메서드를 재정의해야 합니다.

 

public String filterType();
public int filterOrder();
boolean shouldFilter();
Object run() throws ZuulException;

 

 

각 메서드의 의미는 아래와 같습니다.

 

  • filterType : 사전, 경로, 사후필터인지 지정
  • filterOrder : 주울이 다른 필터 유형으로 요청을 보내야 하는 순서
  • shouldFilter : 필터의 활성화 여부
  • run : 필터를 통과할 때 수행되는 로직

 

 

위 TrackingFilter 는 사전필터로, 유입되는 요청에 상관관계 ID를 부여하는 필터입니다.

 

run 메서드에 있는 FilterUtils.setCorrelationId 메서드는 아래와 같습니다.

 

public void setCorrelationId(String correlationId){
    RequestContext ctx = RequestContext.getCurrentContext();
    ctx.addZuulRequestHeader(CORRELATION_ID, correlationId);
}

 

 

 

메서드를 보면 addZuulRequestHeader 를 볼 수 있습니다.

 

주울은 유입되는 요청에 직접 헤더를 추가하거나 수정하는 것을 금합니다.

때문에, 주울이 별도로 관리하는 헤더 맵에 추가해야 합니다.

 

이 주울이 별도로 관리하는 헤더 맵에 데이터를 추가 및 수정하는 메서드가 바로 addZuulRequestHeader 이며,

이는 주울이 서비스를 호출할 때 요청 헤더와 합쳐 전송하게 됩니다.

 

7. 상관관계 ID를 전달받는 사후 필터 작성

주울은 서비스 클라이언트 대신해 실제 HTTP 호출을 수행합니다.

때문에, 주울은 사후필터를 통해 서비스 호출에 대한 응답에 대해서 검사, 수정, 추가 정보를 넣을 수 있습니다.

 

아래는 사후필터의 예제입니다.

 

@Component
public class ResponseFilter extends ZuulFilter{
    private static final int  FILTER_ORDER=1;
    private static final boolean  SHOULD_FILTER=true;
    private static final Logger logger = LoggerFactory.getLogger(ResponseFilter.class);
    
    @Autowired
    FilterUtils filterUtils;

    @Override
    public String filterType() {
        return FilterUtils.POST_FILTER_TYPE;
    }

    @Override
    public int filterOrder() {
        return FILTER_ORDER;
    }

    @Override
    public boolean shouldFilter() {
        return SHOULD_FILTER;
    }

    @Override
    public Object run() {
        RequestContext ctx = RequestContext.getCurrentContext();

        logger.debug("Adding the correlation id to the outbound headers. {}", filterUtils.getCorrelationId());
        ctx.getResponse().addHeader(FilterUtils.CORRELATION_ID, filterUtils.getCorrelationId());

        logger.debug("Completing outgoing request for {}.", ctx.getRequest().getRequestURI());

        return null;
    }
}

 

예제를 보면, 사전필터와 동일하게 ZuulFilter를 상속받은 구조인 것을 아실것입니다.

 

사후 필터로 만들기 위해 filterType 는 FilterUtils.POST_FILTER_TYPE 로 세팅하였으며,

run에서는 서비스 호출 응답에 상관관계 ID를 넣어주는것을 보실 수 있습니다.

 

8. 동적 경로 필터 적용

마지막으로 살펴볼 필터는 동적 경로 필터입니다.

 

동적 경로 필터는 주울로 들어온 요청의 데이터를 통해 어느 서비스를 호출할건지 동적으로 설정할 수 있도록 개발자에게 위임하는 필터입니다.

 

동적 경로 필터는 A/B 테스팅과 같은 작업을 수행할 때 이용할 수 있습니다.

아래는 동적 경로 필터를 사용하여 A/B 테스팅을 수행할 시 이루어지는 그림입니다.

 

 

아래는 동적 경로 필터의 예제입니다.

 

@Component
public class SpecialRoutesFilter extends ZuulFilter {
    private static final int FILTER_ORDER =  1;
    private static final boolean SHOULD_FILTER =true;

    @Autowired
    FilterUtils filterUtils;

    @Autowired
    RestTemplate restTemplate;

    @Override
    public String filterType() {
        return filterUtils.ROUTE_FILTER_TYPE;
    }

    @Override
    public int filterOrder() {
        return FILTER_ORDER;
    }

    @Override
    public boolean shouldFilter() {
        return SHOULD_FILTER;
    }

    private ProxyRequestHelper helper = new ProxyRequestHelper();
    
    @Override
    public Object run() {
        RequestContext ctx = RequestContext.getCurrentContext();

        AbTestingRoute abTestRoute = getAbRoutingInfo( filterUtils.getServiceId() );

        if (abTestRoute!=null && useSpecialRoute(abTestRoute)) {
            String route = buildRouteString(ctx.getRequest().getRequestURI(),
                    abTestRoute.getEndpoint(),
                    ctx.get("serviceId").toString());
            forwardToSpecialRoute(route);
        }

        return null;
    }
}

 

위에서 유의 깊게 볼것은 2개가 있습니다.

 

  1. filterType에 동적 경로 필터로 등록하는 filterUtils.ROUTE_FILTER_TYPE
  2. 주울 라이브러리에서 제공하는 ProxyRequestHelper 클래스

ProxyRequestHelper 는 서비스 요청의 프록싱을 도와주는 주울에서 제공하는 helper 클래스입니다.

아래는 helper 클래스를 사용하는 forwardToSpecialRoute 메서드입니다.

 

private void forwardToSpecialRoute(String route) {
    RequestContext context = RequestContext.getCurrentContext();
    HttpServletRequest request = context.getRequest();

    MultiValueMap<String, String> headers = this.helper
            .buildZuulRequestHeaders(request);
    MultiValueMap<String, String> params = this.helper
            .buildZuulRequestQueryParams(request);

    String verb = getVerb(request);
    InputStream requestEntity = getRequestBody(request);

    if (request.getContentLength() < 0) {
        context.setChunkedRequestBody();
    }
    this.helper.addIgnoredHeaders();
    CloseableHttpClient httpClient = null;
    HttpResponse response = null;
    try {
        httpClient  = HttpClients.createDefault();
        response = forward(httpClient, verb, route, request, headers,
                params, requestEntity);
        setResponse(response);
    } catch (Exception ex ) {
        ex.printStackTrace();
    } finally{
        try {
            httpClient.close();
        }
        catch(IOException ex){}
    }
}

 

코드에서 보듯이 주울로 들어온 요청에 대한 header와 params 데이터를

간편하게 복사하는 buildZuulRequestHeaders, buildZuulRequestQueryParams 메서드를 제공하는 것을 볼 수 있습니다.

 

9. 마무리

이번 포스팅에서는 스프링 클라우드와 주울로 서비스 라우팅에 대해 알아보았습니다.

다음에는 마이크로서비스의 보안에 대해 포스팅하겠습니다.

반응형
반응형

1. 서론

이번 포스팅에서는 4장인 서비스 디스커버리에 대해 알아보도록 하겠습니다.

 

2. 서비스 위치 찾기

분산 아키텍처에서는 시스템의 물리적 위치 주소를 알아야하며, 이를 서비스 디스커버리라고 합니다.

 

서비스 디스커버리는 마이크로서비스 아키텍처 & 클라우드 환경에서 매우 중요합니다.

 

아래는 서비스 디스커버리의 이점입니다.

 

  1. 앱 팀은 서비스 디스커버리를 통해 해당 환경에서 실행하는 서비스 인스턴스 개수를 신속하게 수평 확장, 축소 가능합니다.
  2. 서비스 디스커버리 엔진은 사용할 수 없는 서비스 인스턴스로는 요청이 가지 않도록 라우팅하여 앱의 회복성이 향상됩니다.

 

이점만 봤을때는, 기존의 DNS 와 로드밸런서를 두는 것과 다른게 없는것 같습니다.

아래는 일반적인 DNS & 로드밸런서를 두어 사용하는 그림입니다.

 

 

이 구조가 나쁜것은 아닙니다.

하지만, 마이크로서비스 & 클라우드 환경에서는 아래와 같은 이유로 사용하기에 제약이 있습니다.

 

  1. 단일 장애 지점 : 로드밸런서가 전체 인프라 스트럭처의 단일 장애 지점으로, 로드 밸런서에 문제가 생기면 전체 앱도 다운이 됩니다.
  2. 수평 확장의 제약성 : 로드 밸런서 클러스터에 서비스를 모아 연결하므로 부하 분산 인프라 스트럭처를 여러 서버에 수평적으로 확장할 수 있는 능력이 제한됩니다.
  3. 정적 관리 : 로드 밸런서는 일반적으로 서비스를 신속히 등록 및 제거하기에 힘듭니다.
  4. 복잡성 : 로드 밸런서는 라우팅 테이블을 통해 프록시를 하며, 매핑 규칙을 수동으로 조작해야 하기 때문에 복잡성이 증가합니다.

 

3. 클라우드에서 서비스 디스커버리

그렇다면, 클라우드 환경에서 어떠한 점을 고려하여 디스커버리를 사용해야 할까요?

 

클라우드 환경은 서비스 인스턴스가 무수히 많아졌다가 줄어들 수 있기 때문에, 

아래와 같은 이점을 제공하는 서비스 디스커버리를 사용해야 합니다.

 

  1. 고가용성 : 클러스터 지원
  2. 피어 투 피어 : 클러스터의 노드들은 서비스의 상태를 공유
  3. 부하 분산 : 동적으로 서비스 요청의 부하 분산
  4. 회복성 : 서비스 정보를 로컬에 캐싱을 통한 회복성
  5. 장애 내성 : 비정상 서비스를 탐지하여 요청 차단

 

그럼, 위의 이점들을 제공하는 서비스 디스커버리를 선택하였으면 이제 사용해야 합니다.

 

아래는 일반적인 서비스 디스커버리의 동작 아키텍처의 개념과 그림입니다.

 

  • 서비스 등록
  • 클라이언트가 서비스 주소 검색
  • 정보 공유
  • 상태 모니터링

 

 

서비스는 구동 시 서비스 디스커버리에 자신의 물리적 주소를 등록합니다.

보통, 서비스의 각 인스턴스는 고유한 물리적 주소를 가지고 있지만 동일한 서비스 ID로 등록합니다.

동일 서비스에 대한 그룹핑을 위해서 입니다.

 

추가로, 서비스는 일반적으로 1개의 서비스 디스커버리에 등록되며, 정보는 P2P(피어투피어) 모델을 통해 공유합니다.

 

 

이 아키텍처에는 문제점이 하나 있습니다.

바로, A 서비스 인스턴스가 B 서비스 인스턴스를 호출할 때마다 서비스 디스커버리에서 B 서비스 인스턴스들의 정보를 요청하여 

사용하게 된다는 점입니다.

이는 각 서비스간의 서비스 디스커버리와의 결합도가 증가한 것이며, 서비스 디스커버리 장애 시 문제가 될 수 있습니다.

 

이러한 문제를 해결하기 위해, 클라이언트 측 부하 분산을 사용할 수 있습니다.

 

클라이언트 측 부하 분산 방법으로는 클라이언트 로컬에 요청하는 서비스들의 인스턴스 정보들을 캐싱해놓고 사용하는 것입니다.

 

아래는 클라이언트 측 부하 분산을 적용한 아키텍처 그림입니다.

 

 

1) 스프링과 넷플릭스 유레카를 사용한 서비스 디스커버리

 

스프링 클라우드와 넷플릭스에서 제공하는 기능을 이용하여 위 아키텍처를 구성할 수 있습니다.

 

아래는 스프링 클라우드와 넷플릭스 기능을 적용한 그림입니다.

 

 

그림을 보시면 유레카, 리본이라는 것을 볼 수 있습니다.

 

각 개념은 간단히 아래와 같습니다.

 

용어 의미
유레카 서비스 디스커버리로서 서비스 인스턴스들의 위치를 관리
리본 클라이언트 부하 분산을 위해 로컬에 서비스 정보들을 캐싱하며, 주기적으로 유레카에서 데이터를 조회하여 갱신합니다. 추가로 로드밸렁싱 역할을 수행

 

 

반응형

 

 

4. 스프링 유레카 서비스 구축

이제 스프링 부트를 통해 유레카 서비스를 구축하겠습니다.

 

1) 유레카 서버 라이브러리 추가

 

compile 'org.springframework.cloud:spring-cloud-starter-eureka-server:1.4.7.RELEASE'

 

2) application.yml 파일 설정

 

 

유레카는 기본적으로 모든 서비스가 등록할 기회를 갖도록 5분을 기다린 후 등록된 서비스를 공유합니다.

그리고, 서비스 인스턴스의 상태를 확인해야 하기 때문에 유레카는 10초 간격으로 연속 3회의 heartbeat를 받아야 하므로 등록된 서비스는 보여지는데 30초가 소요됩니다.

 

유레카 서버에서 서비스별 정보는 아래와 같이 REST API 를 통해 확인 할 수 있습니다.

 

http://<eureka ip>:8761/eureka/apps/<APPS ID>

 

 

3) EnableEurekaServer 어노테이션 사용

 

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

 

위 3개의 설정으로 유레카 서버의 세팅은 완료되었습니다.

 

5. 스프링 유레카에 서비스 등록

 

이젠 서비스 측에서 위에서 세팅한 유레카 서버에 자신을 등록하는 법을 알아보겠습니다.

 

1) 유레카 클라이언트 라이브러리 등록

 

compile 'org.springframework.cloud:spring-cloud-starter-netflix-eureka-client:2.2.3.RELEASE'

 

2) application.yml 파일 설정

 

 

유레카 클라이언트는 아래 두가지 구성 요소가 필요합니다.

 

  1. 어플리케이션 ID : 서비스 인스턴스의 그룹을 의미하며, spring.application.name 으로 설정한 값으로 세팅되어 집니다.
  2. 인스턴스 ID : 개별 서비스 인스턴스를 인식하는 임의의 숫자입니다.

위 fetchRegistry 설정은 서비스 레지스트리에서 가져온 정보를 로컬에 캐싱할지의 설정입니다.

 

6. 서비스 디스커버리를 사용한 서비스 검색

이젠 유레카에 등록을 했으니 데이터를 받아와 사용하는 법을 알아보겠습니다.

 

유레카에서 데이터를 받아와 사용하는 방법은 아래와 같이 3가지가 있습니다.

 

  1. 스프링 디스커버리 클라이언트
  2. RestTemplate 가 활성화된 스프링 디스커버리 클라이언트
  3. 넷플릭스 Feign 클라이언트

 

1) 스프링 DiscoveryClient로 서비스 인스턴스 검색

 

먼저 아래와 같이, @EnableDiscoveryClient 를 사용하여 앱에서 DiscoveryClient를 DI 할 수 있도록 합니다.

 

@SpringBootApplication
@EnableDiscoveryClient
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

 

아래는 DiscoveryClient 를 통해 조직 서비스에 call을 날려 데이터를 가져오는 메소드 입니다.

 

@Component
public class OrganizationDiscoveryClient {

    @Autowired
    private DiscoveryClient discoveryClient;

    public Organization getOrganization(String organizationId) {
        RestTemplate restTemplate = new RestTemplate();
        List<ServiceInstance> instances = discoveryClient.getInstances("organizationservice");

        if (instances.size()==0) return null;
        String serviceUri = String.format("%s/v1/organizations/%s",instances.get(0).getUri().toString(), organizationId);
    
        ResponseEntity< Organization > restExchange =
                restTemplate.exchange(
                        serviceUri,
                        HttpMethod.GET,
                        null, Organization.class, organizationId);

        return restExchange.getBody();
    }
}

 

 

위의 소스는 저수준 코드로 보시면 됩니다.

직접 유레카서버에서 조직 서비스에 대한 인스턴스들을 가져와 자체적으로 로드밸렁싱을 하기 때문입니다.

 

 

2) 리본 지원 스프링 RestTemplate을 사용한 서비스 호출

 

@LoadBalanced 를 통해 리본이 지원하는 RestTemplate 를 생성하도록 지정합니다.

 

@LoadBalanced
@Bean
public RestTemplate getRestTemplate() {
    return new RestTemplate();
}
@Component
public class OrganizationRestTemplateClient {
    @Autowired
    RestTemplate restTemplate;

    public Organization getOrganization(String organizationId){
        ResponseEntity<Organization> restExchange =
                restTemplate.exchange(
                        "http://organizationservice/v1/organizations/{organizationId}",
                        HttpMethod.GET,
                        null, Organization.class, organizationId);

        return restExchange.getBody();
    }
}

 

위 코드를 보면, 첫번째 방법보다 개발자의 작업이 줄어든것을 볼 수 있습니다.

리본 RestTemplate는 내부적으로 URL에 명시한 APP ID를 통해 캐싱된 서비스 인스턴스에서 호출합니다.

자체적으로 라운드 로빈으로 인스턴스들을 호출하기 때문에 클라이언트측에서 로드밸런싱을 하고 있다고 보시면 됩니다.

 

3) 넷플릭스 Feign 클라이언트로 서비스 호출

 

@EnableFeignClients 를 사용하여 FeignClient를 사용할 수 있도록 합니다.

 

@SpringBootApplication
@EnableFeignClients
public class Application {
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
    }
}

 

@FeignClient("organizationservice")
public interface OrganizationFeignClient {
    @RequestMapping(
            method= RequestMethod.GET,
            value="/v1/organizations/{organizationId}",
            consumes="application/json")
    Organization getOrganization(@PathVariable("organizationId") String organizationId);
}

 

위 소스는 가장 추상화가 되어진 코드로 보시면 됩니다.

코드를 보시면 아시겠지만 스프링 클라우드는 Controller와 비슷하게 작성할 수 있도록 개발자에게 제공하였습니다.

 

내부적으로 리본이 사용되기 때문에 로컬 캐싱 및 RR과 같은 로드밸런싱도 동작합니다.

 

7. 마무리

이번 포스팅에서는 서비스 디스커버리에 대해 알아보았습니다.

다음에는 나쁜 상황에 대비한 스프링 클라우드와 넷플릭스 히스트릭스의 클라이언트 회복성 패턴에 대해 포스팅하겠습니다.

반응형
반응형

1. 서론

이번 포스팅에서는 3장인 스프링 클라우드 컨피그 서버로 구성 관리에 대해 알아보도록 하겠습니다.

2. 구성(그리고 복잡성) 관리

클라우드에서 실행되는 마이크로 서비스의 구성관리는 매우 중요합니다.

사람이 수동으로 구성 및 배포 시 예상치 못한 장애가 발생할 확률이 올라가고, 유연하게 앱을 확장하기가 힘들어 집니다.

 

때문에, 구성관리는 아래의 네 가지 원칙을 고려해야 합니다.

 

  1. 분리 : 실제 물리적인 서비스의 배포와 서비스 구성 정보를 완전히 분리.
  2. 추상화 : 서비스 인터페이스 뒷 단에 있는 구성 데이터의 접근 방식을 추상화.
  3. 중앙 집중화 : 어플리케이션의 구성 정보를 가능한 소수 저장소에 집중화.
  4. 견고성 : 고가용성과 다중성 포함.

 

1) 구성 관리 아키텍처

 

구성관리는 2장에서 살펴본 부트스트래핑 단계입니다.

 

 

부트스트래핑 단계의 과정은 아래와 같습니다.

 

 

  1. 클라이언트는 구성 관리 서비스에게 요청하여 정보를 얻어옵니다.
  2. 실제 구성 정보는 별도 저장소에 있으며 구성 관리 서비스는 저장소에서 조회하여 제공합니다.
  3. 개발자들은 어플리케이션과는 별도로 구성정보를 변경할 수 있도록 빌드 및 배포 파이프라인을 만들어 운영합니다.
  4. 구성관리 서비스는 저장소의 변경을 탐지하여 어플리케이션이 갱신하도록 합니다.

 

이러한 구성관리를 위한 오픈 소스 솔루션으로는 아래와 같이 여러 가지가 있습니다.

  • Etcd
  • 유레카
  • 콘설(Consul)
  • 주키퍼(Zookeeper)
  • 스프링 클라우드 컨피그 서버

 

 

반응형

 

 

3. 스프링 클라우드 컨피그 서버 구축

여기서는 위 종류 중에 스프링 클라우드 컨피그 서버를 사용하겠습니다.

스프링 클라우드 컨피그 서버는 스프링 부트로 만든 REST 기반의 어플리케이션입니다.

 

컨피그 서버를 사용하기 위해서는 일단, 아래와 같이 dependency를 추가해줍니다.

 

compile 'org.springframework.cloud:spring-cloud-config-server:2.2.3.RELEASE'
compile 'org.springframework.cloud:spring-cloud-starter-config:2.2.3.RELEASE'

 

그 후, resources/application.yml 파일에 컨피그 서버 정보를 기입합니다.

기입할 정보는 아래와 같습니다.

 

  • 컨피그 서비스가 수신 대기할 포트
  • 구성 데이터를 제공하는 백엔드 위치

이제 컨피그 서버가 서비스별 제공할 구성 정보를 만듭니다.

컨피그 서버에서 어플리케이션 구성 파일의 명명 규칙은 appname-env-yml 입니다.

 

2장에서 살펴본 라이선싱 서비스의 구성정보를 예로 한다면 아래와 같이 파일을 구성하시면 됩니다.

 

  • file path : resources/config/licensingservice/
  • file name : licensingservice.yml, licensingservice-dev.yml, licensingservice-real.yml

 

1) 컨피그 부트스트랩 클래스 설정

 

스프링 클라우드에서는 아래와 같이 @EnableConfigServer 어노테이션을 사용하여 컨피그 서버로 등록할 수 있습니다.

 

@SpringBootApplication
@EnableConfigServer
public class ConfigServerApplication {
	public static void main(String[] args) {
		SpringApplication.run(ConfigServerApplication.class, args);
	}
}

 

 

2) 파일 시스템과 스프링 클라우드 컨피그 서버 사용

 

컨피그 서버는 application.yml를 사용하여 앱의 구성 데이터를 보관할 저장소를 지정합니다.

 

가장 간편한것은 파일 시스템 기반이며, 아래는 컨피그 서버의 application.yml 예제입니다.

 

server:
   port: 8888
spring:
  profiles:
    active: native
  cloud:
     config:
       server:
           native:
              searchLocations: file://<chapter 3>/confsvr/src/main/resources/config/licensingservice,
                               file://<chapter 3>confsvr/src/main/resources/config/organizationservice

 

application.yml에서 주의 깊게 볼것은 마지막의 spring.cloud.config.server.native.searchLocations 입니다.

 

각 앱별로 제공할 구성 데이터의 파일 위치를 쉼표를 구분자로 기입해주시면 됩니다.

 

이제 컨피그 서버를 띄우신 다음, 컨피그 서버가 제공하는 구성 데이터를 확인하고 싶으시다면 아래 url을 통해 확인이 가능합니다.

프로퍼티별 정보를 보고 싶다면 uri의 마지막 default를 dev 혹은 real과 같이 구성한 프로퍼티로 수정하면 됩니다.

 

http://localhost:8888/licensingservice/default

 

4. 스프링 클라우드 컨피그와 스프링 부트 클라이언트의 통합

이번에는 컨피그 서버를 이용하는 클라이언트에 대해 진행하도록 하겠습니다.

 

1) 컨피그 서버 클라이언트 라이브러리 추가

 

compile 'org.springframework.cloud:spring-cloud-config-client:2.2.3.RELEASE'

 

2) bootstrap.yml, application.yml 정보 추가

 

클라이언트는 resources 디렉터리 하위에 bootstrap.yml 혹은 application.yml에 컨피그 서버에 대한 정보를 넣으면 됩니다.

 

일반적으로 bootstrap.yml 에는 아래 정보들을 명시합니다.

  • 서비스 어플리케이션 이름
  • 어플리케이션 프로파일
  • 스프링 클라우드 컨피그 서버에 접속할 URI
spring:
  application:
    name: licensingservice
  profiles:
    active: default
  cloud:
    config:
      uri: http://localhost:8888

 

위 spring.application.name 은 컨피그 서버의 디렉터리 이름과 일치해야합니다.

즉, 컨피그 서버에 licensingservice 디렉터리가 있어야 합니다.

 

spring.cloud.config.uri 에는 클라이언트가 접속할 컨피그 서버의 엔드포인트를 기입합니다.

 

일반적으로 application.yml에는 컨피그 서버를 사용하지 못하는 경우에도 사용할 구성 데이터를 명시합니다.

 

 

3) 클라이언트 구성 정보 확인

 

위 컨피그 서버의 구성 정보는 http://localhost:8888/licensingservice/default 를 통해 알았습니다.

그럼, 클라이언트는 컨피그 서버한테 구성 정보를 잘 받아 왔는지 어떻게 확인 할 수 있을까요?

 

스프링 액츄에이터를 사용하면 간편히 http 호출로 확인이 가능합니다.

 

스프링 액츄에이터 라이브러리를 추가 후 아래와 같이 url을 호출하시면 됩니다.

 

http://localhost:8080/actuator/env

 

4) 깃과 스프링 클라우드 컨피그 서버 사용

 

위에서 살펴본 방법은 파일시스템을 이용하여 컨피그 서버의 구성 정보를 관리 했습니다.

 

컨피그 서버는 파일시스템이 아닌 깃과도 연동이 됩니다.

 

깃과 연동할 시에는 컨피그 서버의 application.yml을 아래와 같이 수정해줍니다.

 

server:
  port: 8888
spring:
  cloud:
    config:
      discovery:
        enabled: true
      server:
        encrypt.enabled: false
        git:
          uri: https://github.com/klimtever/config-repo/
          searchPaths: licensingservice,organizationservice
          username: native-cloud-apps
          password: 0ffended

 

spring.cloud.config.server.git.uri 에 깃 레파지토리 주소를 기입합니다.

spring.cloud.config.server.git.searchPaths 는 컨피그 서버가 호스팅하는 서비스들이며 쉼표 구분자를 통해 기입합니다.

searchPaths에 있는 서비스들은 실제로 깃에 디렉터리로 존재해야 합니다.

 

5) 프로퍼티 갱신

 

컨피그 서버가 관리하는 구성 정보가 변경된 경우, 클라이언트들은 변경된 구성정보를 어떻게 다시 가져 올 수 있을까요?

일반적으로 클라이언트는 처음 구동시에만 컨피그 서버에서 정보를 가져옵니다.

 

스프링 액츄에이터는 이를 위해 @RefreshScope 어노테이션을 제공합니다.

@RefreshScope 를 명시한 어플리케이션은 /refresh 엔드포인트가 생성되며 호출 시 스프링 프로퍼티만을 다시 로드합니다.

여기서 스프링 프로퍼티는 컨피그서버에 있기 때문에 다시 컨피그서버에서 가져오게 됩니다.

 

@SpringBootApplication
@RefreshScope
public class Application { 
    public static void main(String[] args) {
        SpringApplication.run(Application.class, args);
  }
}

 

5. 중요한 구성 정보 보호

구성 정보에는 노출되면 안되고, 암호화 된 정보로 있어야 할 수 있습니다.

 

이를 위해 스프링 클라우드 컨피그 서버는 쉽게 대칭 및 비대칭 암호화 방법을 제공합니다.

 

구성에 따라, 컨피그 서버에서 암호화를 하여 클라이언트에 제공할 수 있으며,

반대로, 암호화된 정보를 컨피그 서버가 가지고 있고 클라이언트가 복호화하여 사용하게 할 수도 있습니다.

6. 마무리

이번 포스팅에서는 스프링 클라우드 컨피그 서버로 구성 관리에 대해 알아보았습니다.

다음에는 서비스 디스커버리에 대해 포스팅하겠습니다.

반응형
반응형

1. 서론

이번 포스팅에서는 2장인 테스트에 대해 알아보도록 하겠습니다.

 

2. UserDaoTest 다시보기

스프링이 개발자에게 제공하는 가장 중요한 것은 객체지향적인 설계와 테스트입니다.

 

테스트는 만들어진 코드를 확신할 수 있게 해주며, 변화에 유연하게 대처할 수 있는 자신감을 가져다 줍니다.

 

일반적인 웹 프로그램에서 테스트를 할 때, 개발자들은 실수를 합니다.

 

바로, 서비스 계층, MVC 프레젠테이션 계층, JSP 계층까지 모두 개발을 완료한 후 브라우저 혹은 curl과 같이 직접 http request를 통해 

어찌보면 한 요청의 통합테스트를 진행한다는 점입니다.

 

물론, 바로 원하는 대로 동작을 하면 좋겠지만 기본적으로 개발자들은 자신이 만든 코드를 의심해야합니다.

실패 시, 어느 계층에서 에러가 발생했는지 트레이싱이 힘들며 변화에 있어서 유연하지 못합니다.

 

작은 단위의 테스트

 

위 같은 문제를 해결하기 위해 개발자들은 가능한 작은 단위로 테스트를 작성하고 수행해야 합니다.

 

이런 작은 단위의 코드를 테스트하는 것을 단위테스트( = Unit Test )라고 일컫습니다.

이 단위 테스트의 장점은 개발자가 설계하고 만든 코드가 원래 의도한 대로 동작하는지를 개발자 스스로 빠르게 확인받을 수 있다는 점입니다.

 

그렇다면, 단위 테스트만 있으면 되고 위에 같이 한 요청에 대한 통합 테스트는 필요 없을까요?

아닙니다. 각 단위의 기능은 테스트시 통과 할 지라도 기능들을 엮었을 땐, 실패할 때가 있기 때문에 단위 테스트 다음엔 통합 테스트로 각 단위의 조합까지 테스트를 하는 것이 가장 이상적입니다.

 

그럼, 1장에서 만들었던 UserDaoTest를 봐보겠습니다.

 

UserDaoTest는 아래와 같습니다.

 

public class UserDaoTest {

    public static void main(String[] args) throws SQLException, ClassNotFoundException {

        ApplicationContext context = new GenericXmlApplicationContext("applicationContext.xml");

        UserDao dao = context.getBean("userDao", UserDao.class);
        
        User user = new User();
        user.setId("user");
        user.setName("백기선");
        user.setPassword("married");
        
        dao.add(user);

        System.out.println(user.getId() + " 등록 성공");
        
        User user2 = dao.get(user.getId());
        System.out.println(user2.getName());
        System.out.println(user2.getPassword());

        System.out.println(user2.getId() + " 조회 성공");
    }
}

 

여기에는 몇가지 문제점이 있습니다.

 

  1. 수동 확인 작업의 번거로움 : 기능에 대한 검증을 사람이 직접 콘솔에 찍히는 것을 보고 수동 확인해야 한드는 점입니다.
  2. 실행 작업의 번거로움 : 하나의 Dao에 main문을 만들었기 때문에, 매번 테스트를 수행하기 위해서는 main을 수행해야 하며, Dao가 많아 질수록 main 문이 늘어난다는 점입니다.

 

3. UserDaoTest 개선

위 UserDaoTest의 문제점들을 단계적으로 개선하겠습니다.

 

먼저, 수동 확인의 번거로움입니다. 

현재는 콘솔에 찍히는 "등록 성공" 과 "조회 성공"으로 테스트 통과여부를 판단하고 있습니다.

 

그렇다면 저 2개의 콘솔 출력만 된다면 성공일까요?

아닙니다. 테스트는 에러가 나지 않았더라도 예상한것과 다르게 기능이 동작했을 수 있기 때문입니다.

 

때문에 아래와 같이 코드를 수정하겠습니다.

 

if (!user.getName().equals(user2.getName())) {
    System.out.println("테스트 실패 (name)");    
} else if (!user.getPassword().equals(user2.getPassword())) {
    System.out.println("테스트 실패 (password)");
} else {
    System.out.println("조회 테스트 성공");
}

 

이제, name과 password를 일일이 수동 확인하지 않고 콘솔 출력으로 "조회 성공" 만 나오는것으로 테스트 확인이 가능해졌습니다.

만약, 값이 이상하게 저장되었다면 콘솔 출력된것을 보고 수정 후 다시 한번 테스트를 수행하면 될 것입니다.

 

하지만, 여기에도 문제점은 존재합니다.

우선 if - else if - else 문으로 인해 User에 대해 검증할 필드들이 늘어날 수록 코드의 장황함은 커질 것 입니다.

또한, 콘솔 출력을 일일이 확인해야 하기 때문에 완벽히 수동 확인의 번거로움을 해소한것이 아닙니다.

 

이런 부분은 실행 부분의 번거로움을 개선하기 위해 사용할 JUnit 테스트로 전환하면서 같이 개선이 되어집니다.

 

JUnit 이란, 자바로 단위 테스트를 만들때 사용할 수 있는 프레임워크로 스프링과 궁합이 좋습니다.

 

JUnit 은 위와 같이 main 문이 필요없으며, 제공하는 어노테이션들을 사용하여 손쉽게 테스트 코드 작성과 검증이 가능합니다.

 

아래는 UserDaoTest에 JUnit을 적용한 코드입니다.

 

public class UserDaoTest {
    
    @Test
    public void addAndGet() throws SQLException, ClassNotFoundException {
        ApplicationContext context = new GenericXmlApplicationContext("applicationContext.xml");

        UserDao dao = context.getBean("userDao", UserDao.class);

        User user = new User();
        user.setId("gyumee");
        user.setName("박성철");
        user.setPassword("springno1");

        dao.add(user);

        System.out.println(user.getId() + " 등록 성공");

        User user2 = dao.get(user.getId());

        Assertions.assertEquals(user2.getName(), user.getName());
        Assertions.assertEquals(user2.getPassword(), user.getPassword());
    }
}

 

 

@Test 어노테이션과, public void 형식으로 선언되어 있다면 JUnit에서는 테스트를 원하는 소스로 판단하여 테스트를 수행하게 됩니다.

 

여기서, 또 한가지 볼 수 있는것은 Assertions.assertEquals 메서드입니다.

이는 junit에서 제공하는 메서드로서 기대값과 실제값을 인자로 받아 개발자가 예상한 값과 같은지 확인합니다.

위, Assertions.assertEquals 메서드는 junit 버전에 따라 약간 상이할 수 있습니다.

 

JUnit에서는 위와 같은 방법으로 굳이 불필요한 콘솔 출력이 필요 없게 되며 main 문도 사라졌습니다.

또한, 한 테스트 클래스에 여러개의 @Test 어노테이션을 통해 테스트 코드의 확장도 가능해졌습니다.

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

4. 개발자를 위한 테스팅 프레임워크 JUnit

자바 개발자는 필수적으로 Junit을 사용할 수 있어야합니다.

이러한, Junit 은 일반적으로 이클립스 혹은 인텔리제이과 같은 IDE에서 모두 지원이 되므로 편하게 사용할 수 있습니다.

ide가 아닌 메이븐, 그래들과 같은 빌드 툴로도 테스트 수행은 가능합니다.

 

위 UserDaoTest에는 사실 아직도 문제점이 있습니다. 바로 테스트 결과의 일관성이 침해한다는 점입니다.

addAndGet를 테스트한 후에는 DB에 데이터를 지워야 다음에 테스트틀 다시 수행할 때 영향이 없기 때문입니다.

그렇다는것은 해당 테스트는 DB 데이터에 의존성이 강하게 있다는 것을 의미합니다.

 

단위 테스트는 코드가 바뀌지 않는다면, 매번 실행할 때마다 동일한 테스트 결과를 얻을 수 있어야 합니다.

 

이를, 개선하기 위해 아래 2개의 메서드를 추가하도록 하겠습니다.

 

  • deleteAll()
  • getCount()
public void deleteAll() throws SQLException, ClassNotFoundException {
    Connection c = connectionMaker.makeConnection();
    
    PreparedStatement ps = c.prepareStatement("delete from users");
    ps.executeUpdate();
    
    ps.close();
    c.close();
}

public int getCount() throws SQLException, ClassNotFoundException {
    Connection c = connectionMaker.makeConnection();
    PreparedStatement ps = c.prepareStatement("select count(*) from users");
    
    ResultSet rs = ps.executeQuery();
    rs.next();
    int count = rs.getInt(1);
    
    rs.close();
    ps.close();
    c.close();
    
    return count;
}

 

위와 같은, 기능을 추가하였으니 이를 확인하기 위해 테스트 코드 또한 작성해보겠습니다.

 

public class UserDaoTest {

    @Test
    public void addAndGet() throws SQLException, ClassNotFoundException {
        ApplicationContext context = new GenericXmlApplicationContext("applicationContext.xml");

        UserDao dao = context.getBean("userDao", UserDao.class);
        
        dao.deleteAll();
        Assertions.assertEquals(dao.getCount(), 0);
        
        User user = new User();
        user.setId("gyumee");
        user.setName("박성철");
        user.setPassword("springno1");

        dao.add(user);
        Assertions.assertEquals(dao.getCount(), 1);

        User user2 = dao.get(user.getId());

        Assertions.assertEquals(user2.getName(), user.getName());
        Assertions.assertEquals(user2.getPassword(), user.getPassword());
    }
}

 

 

이제, addAndGet 을 수행 할 시 deleteAll 로 인해 테스트 수행 후 DB에 데이터를 직접 지우는 불필요한 작업은 없어졌습니다.

또한, getCount을 통하여 deleteAll과 add 메서드에 대해 검증작업 또한, 추가되었습니다.

 

하지만 위에는 getCount가 정상적으로 동작했을때를 가정되어 있기 때문에, getCount에 대한 테스트도 필요합니다.

 

@Test
public void count() throws SQLException, ClassNotFoundException {
    ApplicationContext context = new GenericXmlApplicationContext("applicationContext.xml");
    UserDao dao = context.getBean("userDao", UserDao.class);
    
    User user1 = new User("gyumee", "springno1", "박성철");
    User user2 = new User("leegw700", "springno2", "이길원");
    User user3 = new User("bumjin", "springno3", "박범진");
    
    dao.deleteAll();
    Assertions.assertEquals(dao.getCount(), 0);

    dao.add(user1);
    Assertions.assertEquals(dao.getCount(), 1);

    dao.add(user2);
    Assertions.assertEquals(dao.getCount(), 2);
    
    dao.add(user3);
    Assertions.assertEquals(dao.getCount(), 3);
}

 

count 테스트를 위해 user를 하나씩 넣어가며 확인을 하였습니다.

 

이제 어느정도 UserDao에 대해 테스트가 된 것 같습니다.

 

하지만, 아직 조금 꺼림칙한 부분이 있습니다. 바로 get 메서드입니다.

 

get으로 받은 id 값이 DB에 없을때가 있기 때문입니다.

이런경우, 흔히 null을 반환하도록 혹은 예외를 일으키는 방법중에 선택을 합니다.

 

여기서는, 후자인 예외를 일으키는 방법을 사용하겠습니다.

 

아래와 같이 테스트 코드를 추가하겠습니다.

 

@Test
public void getUserFailure() throws SQLException, ClassNotFoundException {
    ApplicationContext context = new GenericXmlApplicationContext("applicationContext.xml");
    UserDao dao = context.getBean("userDao", UserDao.class);

    dao.deleteAll();
    Assertions.assertEquals(dao.getCount(), 0);

    Assertions.assertThrows(EmptyResultDataAccessException.class, dao.get("unknown_id"));
}

 

위는 예외를 일으켜야 성공하는 테스트 코드입니다.

예외는 지정한 EmptyResultDataAccessException 예외가 일어나야 합니다.

 

이제 위 테스트 코드를 성공시키기 위해 dao의 get 메서드를 수정하겠습니다.

 

public User get(String id) throws ClassNotFoundException, SQLException {
    Connection c = connectionMaker.makeConnection();
    PreparedStatement ps = c.prepareStatement("select * from users where id = ?");
    ps.setString(1, id);

    ResultSet rs = ps.executeQuery();
    User user = null;
    if (rs.next()) {
        user =  new User();
        user.setId(rs.getString("id"));
        user.setName(rs.getString("name"));
        user.setPassword(rs.getString("password"));
    }
    
    rs.close();
    ps.close();
    c.close();
    
    if (user == null) throw new EmptyResultDataAccessException(1);
    return user;
}

 

이제 get 메서드시 데이터가 없다면 예외를 일으키며, 테스트 또한 성공하게 될 것입니다.

 

이렇게, 개발자들은 항상 실패 케이스에 대해서 생각을 하며 테스트를 해야합니다.

책에서는, 이런 부정적인 케이스를 먼저 만드는 습관을 들이는걸 추천합니다.

 

추가로, 독자들은 지금 테스트 주도 개발인 TDD을 체험하였습니다.

 

TDD란 테스트를 먼저 만들고 그 테스트가 성공하도록 코드를 만드는 방식입니다.

getUserFailure 테스트를 먼저 만들고 이를 성공시키기 위해 get 메서드를 수정한것과 같이 말입니다.

 

TDD의 장점은 코드를 만들어 테스트를 실행하는 그 사이의 간격이 매우 짦다는 점입니다.

 

이제 UserDaoTest 를 마지막으로 리팩토링 해보겠습니다. 리팩토링의 대상은 어플리케이션 뿐만이 아닌 테스트 코드도 포함입니다.

 

UserDaoTest 를 보니 UserDao 빈을 가져오는 부분이 계속 중복됩니다.

어플리케이션 코드였다면 이를 별도의 메서드로 추출이 가능했습니다.

 

하지만, JUnit에서는 각 @Test 수행 전 세팅을 하는 기능을 제공합니다.

바로 @Before 어노테이션이 붙은 메서드를 활용한것으로 코드는 아래와 같이 변경됩니다.

 

public class UserDaoTest {
    private UserDao userDao;

    @BeforeEach
    public void setUp() {
        ApplicationContext context = new GenericXmlApplicationContext("applicationContext.xml");
        this.userDao = context.getBean("userDao", UserDao.class);
    }

    ...
}

 

junit은 각 테스트 메서드별로 테스트 클래스를 인스턴스화하여 사용합니다.

때문에, 위와 같이 userDao를 인스턴스 변수로 선언하여 사용해야 합니다.

 

아래는, JUnit의 테스트 메서드 실행 방법이 어떻게 동작하는지에 대한 그림입니다.

 

 

 

JUnit 에서는 테스트를 수행하는데 필요한 정보나 오브젝트를 픽스처라고 합니다.

위 UserDao 오브젝트도 픽스처에 해당하며 테스트에서 쓰였던 User 오브젝트들도 픽스처로 뽑을 수 있습니다.

 

public class UserDaoTest {
    
    private UserDao userDao;
    private User user1, user2, user3;
    
    @BeforeEach
    public void setUp() {
        ApplicationContext context = new GenericXmlApplicationContext("applicationContext.xml");
        this.userDao = context.getBean("userDao", UserDao.class);
        this.user1 = new User("gyumee", "springno1", "박성철");
        this.user2 = new User("leegw700", "springno2", "이길원");
        this.user3 = new User("bumjin", "springno3", "박범진");
    }
    ...

}

 

5. 스프링 테스트 적용

이제 UserDaoTest 클래스에서 더는 개선할 포인트가 없을까요?

JUnit은 매 테스트마다 테스트 클래스가 인스턴스되어 수행된다고 했습니다.

그렇다면, ApplicationContext도 역시 @Test 갯수만큼 생성이 됩니다.

 

하지만, Application 같은 경우에는 bean의 관리 및 DI를 해주는 역할로 굳이 각 인스턴스 변수로 있을 필요가 없습니다.

또한, Application은 초기화 비용이 크기 때문에 테스트 수행시간은 느려지게 될 것입니다.

테스트 코드는 가능한 빠르고 독립적이어야 좋은 테스트 코드입니다.

 

스프링에서는 이를 위해 JUnit을 이용한 편의성을 추가로 제공합니다.

이런 부분에서 스프링과 JUnit은 궁합이 잘 맞는다고 할 수 있습니다.

 

아래는 스프링 테스트 컨텍스트 프레임워크를 적용한 UserDaoTest 코드입니다.

junit 5 이하 버전에서는 @ExtendWith(SpringExtension.class)가 아닌 @RunWith(SpringRunner.class)로 사용하면 됩니다.
@ExtendWith(SpringExtension.class)
@ContextConfiguration(locations = "/applicationContext.xml")
public class UserDaoTest {
    
    @Autowired
    private ApplicationContext context;
    
    private UserDao userDao;
    private User user1, user2, user3;
    
    @BeforeEach
    public void setUp() {
        this.userDao = context.getBean("userDao", UserDao.class);
        this.user1 = new User("gyumee", "springno1", "박성철");
        this.user2 = new User("leegw700", "springno2", "이길원");
        this.user3 = new User("bumjin", "springno3", "박범진");
    }
    
    ...
}

 

SpringExtension 은 spring-test 에서 제공하는 클래스로서 테스트를 진행하는 중에 테스트가 사용할 어플리케이션 컨텍스트를 만들고 관리하는 작업을 진행해줍니다.

 

때문에, 위와 같이 변경하게되면 각 테스트가 돌아갈 때 인스턴스는 다르더라도 context의 인스턴스는 공유하게 됩니다.

따라서, 수백개의 테스트 코드가 있더라도 이제 context의 초기 비용은 처음 테스트 코드에서만 들고 다음부턴 들지 않아 빠른 속도로 수행됩니다.

 

만약, context를 공유하지 않고 싶다면 class 위에 @DirtiesContext 를 붙여줍니다.
@DirtiesContext를 사용할 시에는 context를 공유하지 않고 새로 만들어서 사용하게 됩니다.

 

여기서 1장 정확히 이해한 사람은 위에 @Autowired로 인해 ApplicationContext 가 DI 된다는것에 의문을 가져야 합니다.

이유는, ApplicationContext 라는 것을 스프링에게 bean으로 만들어서 관리해달라는 코드가 어디에도 없기 때문입니다.

 

기본적으로 스프링 컨텍스트는 자기 자신도 bean화 하여 컨테이너에 등록하며, 그로인해 위와 같은 소스가 성공적으로 수행될 수 있습니다.

추가로, @Autowired DI가 걸리는 방법은 아래와 같습니다.

 

  1. 선언 클래스 타입이 빈컨테이너에 있는지 확인, 1개가 있다면 해당 빈 반환
  2. 같은 타입이 2개 이상인 경우 변수명과 동일한 빈 id가 있는지 확인 후 있다면 반환

만약, 위 2개 타입과 변수명으로도 빈을 찾지못한다면 스프링은 예외를 발생시킵니다.

 

 

테스트와 DI

 

아래와 같은 특징들로 인해 테스트 코드에서도 운영코드와 같이 DI를 적용해야 합니다.

 

  1. 소프트웨어 개발에서 절대로 바뀌지 않는 것은 없기 때문입니다.
  2. 클래스의 구현 방식은 바뀌지 않는다고 하더라도 인터페이스를 두고 DI를 적용하게 해두면 다른 차원의 서비스 기능을 도입할 수 있기 때문입니다.
  3. 효율적인 테스트를 손쉽게 만들기 위해서 입니다.

이제, UserDaoTest 에서 마지막으로 할 일이 있습니다. 바로, 테스트용 DI xml 파일을 만드는 것입니다. 

DB의 경우에는 운영과 개발, 로컬 모두 주소와 username, password가 다를 수 있습니다.

때문에, 테스트를 위한 별도 설정 정보를 만들어 사용하는것이 안전하며 관리가 용이합니다.

 

아래와 같이 applicationContext-text.xml 을 만들어 테스트코드에서 이 xml 파일을 보도록 변경시킵니다.

 

@ExtendWith(SpringExtension.class)
@ContextConfiguration(locations = "/applicationContext-test.xml")
public class UserDaoTest {

    @Autowired
    private ApplicationContext context;
    
    private UserDao userDao;
    private User user1, user2, user3;
    
    ...
}

 

 

DI을 이용한 테스트 방법 선택

 

스프링 기반 프로젝트에서는 테스트코드를 모두 테스트 컨테이너를 구동하는 방식으로 사용해야 할까요?

정답은 아닙니다.

 

위에서 살펴본것처럼 @ExtendWith가 없어도 테스트는 모두 동일하게 수행되어 집니다.

오히려, 어떤 경우에는 스프링 컨테이너 없이 만드는 것이 스프링과의 의존성 없이 클래스 메서드에 대해서 순수하게 테스트하는 것입니다.

 

개발자는 우선적으로는 스프링 컨테이너 없이 테스트코드를 작성하려고 해야하며, 테스트 하려는 클래스가 여러 클래스와 의존관계가 있는 경우에는 스프링 컨테이너를 도입하여 테스트 코드를 작성해야 합니다.

 

물론, 의존관계 자체를 테스트할 때도 컨테이너는 필요합니다.
하지만 DI를 생성자 주입 방식으로 한다면 컨테이너가 없어도 클래스의 주입방식을 충분히 테스트가 가능합니다.
때문에, 스프링에서는 생성자 주입 방식을 권장합니다.

 

6. 학습 테스트로 배우는 스프링

학습 테스트란, 자신이 아닌 제 3자의 코드에 대해서 대신 테스트 코드를 작성하는 것을 말합니다.

 

이런 학습테스트로 얻을 수있는 장점은 아래와 같습니다.

 

  1. 다양한 조건에 따른 기능을 손쉽게 확인해볼 수 있습니다.
  2. 학습 테스트 코드를 개발 중에 참고할 수 있습니다.
  3. 프레임워크나 제품을 업그레이드 할 때 호환성 검증을 도와줍니다.
  4. 테스트 작성에 대한 좋은 훈련이 됩니다.
  5. 새로운 기술을 공부하는 과정이 즐거워집니다

 

1) 버그 테스트

 

버그테스트란, 코드에 오류가 있을 때 그 오류를 가장 잘 드러내 줄 수 있는 테스트입니다.

 

버그테스트는 일단, 실패하도록 만들어야 합니다. 후에 이 테스트가 성공하도록 운영코드를 수정하는 것입니다.

위에 TDD를 말했을때와 똑같다고 생각하시면 됩니다.

 

이러한, 버그테스트는 아래와 같은 장점이 있습니다.

 

  1. 테스트의 완성도를 높여줍니다.
  2. 버그의 내용을 명확하게 분석하게 해줍니다.
  3. 기술적인 문제를 해결하는데 도움이 됩니다.

 

7. 마무리

이번 포스팅에서는 2장 테스트에 대해 알아봤습니다.

다음에는 3장 템플릿에 대해 포스팅하겠습니다.

반응형

'Framework > Spring' 카테고리의 다른 글

(1) 오브젝트와 의존관계  (0) 2020.06.15

+ Recent posts