반응형

1. 서론

 

이번 포스팅에서는 토비의 스프링 3.1의 1장인 오브젝트와 의존관계 에 대해 알아보도록 하겠습니다.

 

2. 초난감 DAO

스프링은 자바 언어 기반의 프레임워크입니다.

때문에, 객체지향 프로그래밍이 제공하는 폭넓은 혜택을 누릴 수 있도록 기본으로 돌아가자는 것이 스프링의 핵심입니다.

스프링은 객체지향 설계와 구현을 특정하도록 강요하지는 않습니다.

다만, 효과적으로 설계와 구현을 할 수 있도록 기준을 마련하여 개발자에게 편리함을 제공합니다.

 

아래는 책에 나오는 DB에 데이터를 추가 및 조회할 수 있는 예제입니다.

 

@Setter
@Getter
public class User {
    private String id;
    private String name;
    private String password;
}

 

public class UserDao {

    public void add(User user) throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        Connection c = DriverManager.getConnection("jdbc:mysql://localhost/springbook", "spring", "book");
        PreparedStatement ps = c.prepareStatement("insert into users(id, name, password) values(?, ?, ?)");
        ps.setString(1, user.getId());
        ps.setString(2, user.getName());
        ps.setString(3, user.getPassword());

        ps.executeUpdate();

        ps.close();
        c.close();
    }

    public User get(String id) throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        Connection c = DriverManager.getConnection("jdbc:mysql://localhost/springbook", "spring", "book");
        PreparedStatement ps = c.prepareStatement("select * from users where id = ?");
        ps.setString(1, id);

        ResultSet rs = ps.executeQuery();
        rs.next();
        User user = new User();
        user.setId(rs.getString("id"));
        user.setName(rs.getString("name"));
        user.setPassword(rs.getString("password"));

        rs.close();
        ps.close();
        c.close();

        return user;
    }

}

 

위 예제코드는 정상적으로 동작하나 사실은 문제가 많은 코드입니다.

개발자가 실제로 이러한 코드를 본다면... 경악을 금치 못할것입니다.

 

이제 아래 DAO의 분리와 DAO의 확장을 통해 위의 코드를 리팩토링하며 좀 더 나은 객체지향 설계로 변경하도록 하겠습니다.

 

3. DAO의 분리

위 예제를 리팩토링하기 전에, 먼저 관심사 분리에 대해서 설명하겠습니다.

 

자바 프로그램은 객체지향 장점으로 지속적인 변화에 매우 유용한 언어입니다.

그렇다면, 객체지향 장점이 그럼 무엇일까요?

바로, 관심사의 분리입니다.

 

즉, 관심이 같은 것끼리는 하나로 관심이 다른 것은 다르게 하여 실제 세계의 개념들을 객체라는 것으로 만들 수 있는 것을 의미합니다.

 

그렇다면, 위 초난감 DAO의 관심사항은 무엇일까요?

 

아마, 아래와 같은 관심사항으로 정의할 수 있습니다.

 

  1. DB와 연결을 위한 커넥션을 어떻게 가져올까라는 관심.
  2. 사용자 등록을 위해 DB에 보낼 SQL 문장을 담을 Statement를 만들고 실행하는 관심
  3. 작업이 끝나면 리소스인 Statement와 Connection 오브젝트를 닫아, 리소스 낭비를 일으키지 않는 관심

 

먼저, 중복 코드인 DB 연결 코드를 메소드로 추출하는 작업을 해보겠습니다.

현재는 2개의 메소드 밖에 없지만, 메소드가 10개 100개가 된다면 메소드를 만들 때 마다 Connection 소스가 중복으로 존재하게 됩니다.

 

아래는 getConnection이라는 메소드를 통해 DB 연결을 하는 중복코드를 제거한 코드입니다.

 

public class UserDao {

    public void add(User user) throws ClassNotFoundException, SQLException {
        Connection c = getConnection();
        PreparedStatement ps = c.prepareStatement("insert into users(id, name, password) values(?, ?, ?)");
        ps.setString(1, user.getId());
        ps.setString(2, user.getName());
        ps.setString(3, user.getPassword());

        ps.executeUpdate();

        ps.close();
        c.close();
    }

    public User get(String id) throws ClassNotFoundException, SQLException {
        Connection c = getConnection();
        PreparedStatement ps = c.prepareStatement("select * from users where id = ?");
        ps.setString(1, id);

        ResultSet rs = ps.executeQuery();
        rs.next();
        User user = new User();
        user.setId(rs.getString("id"));
        user.setName(rs.getString("name"));
        user.setPassword(rs.getString("password"));

        rs.close();
        ps.close();
        c.close();

        return user;
    }
    
    private Connection getConnection() throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        Connection c = DriverManager.getConnection("jdbc:mysql://localhost/springbook", "spring", "book");
        return c;
    }

}

 

위와같이 변경함으로, 이제 DB 종류 혹은 접속 정보가 변경되더라도 getConnection 메소드만 수정하여 문제를 해결할 수 있게 되었습니다.

 

 

만약, UserDao를 소스를 외부에 제공하기 위해서는 메소드 추출만으로는 유연한 코드가 될 수 없습니다.

UserDao를 사용하는 클라이언트 입장에서 자신들이 사용하는 DB와 접속 정보에 따라 UserDao 코드를 수정해야 하기 때문입니다.

 

좀 더 유연한 UserDao를 만들기 위해 이번에는 자바에서 제공되는 상속을 사용하여 코드를 리팩토링 하겠습니다.

 

public abstract class UserDao {

    public void add(User user) throws ClassNotFoundException, SQLException {
        Connection c = getConnection();
        PreparedStatement ps = c.prepareStatement("insert into users(id, name, password) values(?, ?, ?)");
        ps.setString(1, user.getId());
        ps.setString(2, user.getName());
        ps.setString(3, user.getPassword());

        ps.executeUpdate();

        ps.close();
        c.close();
    }

    public User get(String id) throws ClassNotFoundException, SQLException {
        Connection c = getConnection();
        PreparedStatement ps = c.prepareStatement("select * from users where id = ?");
        ps.setString(1, id);

        ResultSet rs = ps.executeQuery();
        rs.next();
        User user = new User();
        user.setId(rs.getString("id"));
        user.setName(rs.getString("name"));
        user.setPassword(rs.getString("password"));

        rs.close();
        ps.close();
        c.close();

        return user;
    }

    public abstract Connection getConnection() throws ClassNotFoundException, SQLException;
}

 

UserDao 클래스를 추상 클래스로, getConnection을 추상 메서드로 만들어 외부에서는 자신들이 사용하는 DB에 맞게 getConnection을 오버라이딩하여 사용할 수 있도록 만들었습니다.

 

위와 같인 슈퍼클래스에 기본적인 로직의 흐름을 만들고, 중간에 서브클래스가 제정의한 것을 사용하여 동작하도록 만드는 것이
템플릿 메소드 패턴입니다.

 

추가로, 위 예제에서는 Connection이라는 인터페이스를 통해 UserDao는 어떤 기능을 사용할지에 대한 관심만을 가지게 하였고.

서브 클래스들은 어떤 식으로 Connection 기능을 제공하는지에 관심을 가지도록 나누었습니다.

 

하지만, 위와 같은 코드도 문제점은 존재합니다. 바로 상속을 사용한 문제입니다.

자바는 기본적으로 다중 상속을 지원하지 않습니다.

 

때문에, UserDao의 서브 클래스는 UserDao가 아닌 클래스는 상속을 못받는 문제가 생깁니다.

 

4. DAO의 확장

위 getConnection은 DB 연결에 대한 관심으로 UserDao와는 관심이 다르기 때문에, 아예 별도의 클래스로 분리하겠습니다.

 

public class SimpleConnectionMaker {
    
    public Connection makeNewConnection() throws ClassNotFoundException, SQLException {
        Class.forName("com.mysql.jdbc.Driver");
        Connection c = DriverManager.getConnection("jdbc:mysql://localhost/springbook", "spring", "book");
        return c;
    }
}

 

public class UserDao {

    private SimpleConnectionMaker simpleConnectionMaker;

    public UserDao() {
        simpleConnectionMaker = new SimpleConnectionMaker();
    }

    public void add(User user) throws ClassNotFoundException, SQLException {
        Connection c = simpleConnectionMaker.makeNewConnection();
        PreparedStatement ps = c.prepareStatement("insert into users(id, name, password) values(?, ?, ?)");
        ps.setString(1, user.getId());
        ps.setString(2, user.getName());
        ps.setString(3, user.getPassword());

        ps.executeUpdate();

        ps.close();
        c.close();
    }

    public User get(String id) throws ClassNotFoundException, SQLException {
        Connection c = simpleConnectionMaker.makeNewConnection();
        PreparedStatement ps = c.prepareStatement("select * from users where id = ?");
        ps.setString(1, id);

        ResultSet rs = ps.executeQuery();
        rs.next();
        User user = new User();
        user.setId(rs.getString("id"));
        user.setName(rs.getString("name"));
        user.setPassword(rs.getString("password"));

        rs.close();
        ps.close();
        c.close();

        return user;
    }
}

 

관심사 분리를 적용하여 클래스 분리까지는 좋았습니다.

다만, 위 코드는 외부제공 시 가져야하는 connection에 대한 유연성 문제를 해결하지 못한 코드입니다.

 

이런 경우, 인터페이스를 통하여 2개의 관심사를 느슨하게 연결할 수 있습니다.

 

public interface ConnectionMaker {
    
    Connection makeConnection() throws ClassNotFoundException, SQLException;
}

 

public class UserDao {

    private ConnectionMaker connectionMaker;

    public UserDao(ConnectionMaker connectionMaker) {
        this.connectionMaker = connectionMaker;
    }

    public void add(User user) throws ClassNotFoundException, SQLException {
        Connection c = connectionMaker.makeConnection();
        PreparedStatement ps = c.prepareStatement("insert into users(id, name, password) values(?, ?, ?)");
        ps.setString(1, user.getId());
        ps.setString(2, user.getName());
        ps.setString(3, user.getPassword());

        ps.executeUpdate();

        ps.close();
        c.close();
    }

    public User get(String id) throws ClassNotFoundException, SQLException {
        Connection c = connectionMaker.makeConnection();
        PreparedStatement ps = c.prepareStatement("select * from users where id = ?");
        ps.setString(1, id);

        ResultSet rs = ps.executeQuery();
        rs.next();
        User user = new User();
        user.setId(rs.getString("id"));
        user.setName(rs.getString("name"));
        user.setPassword(rs.getString("password"));

        rs.close();
        ps.close();
        c.close();

        return user;
    }
}

 

ConnectionMaker라는 인터페이스를 하나 만들어, UserDao가 인스턴스 시 해당 인터페이스의 구현체를 전달받아 사용하도록 하는 코드로 변경하였습니다.

 

사실상, 위 코드는 UserDao를 사용하는 제3의 클라이언트가 런타임 시 UserDao와 ConnectionMaker와의 관계를 갖도록 책임을 위임한 코드입니다.

 

그렇다면, 클라이언트는 ConnectionMaker의 구현체를 만들어 new UserDao(connectionImpl); 같은 코드로 프로그래밍해야 합니다.

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

5. 제어의 역전(IoC)

일반적으로, 팩토리는 객체의 생성 방법을 결정하고 그렇게 만들어진 오브젝트를 돌려주는 일을 합니다.

 

위 예제에서는 제 3의 클라이언트가 팩토리 기능을 구현해야 하는 것입니다.

이것 또한, UserDao, ConnectionMaker와는 별개로 객체의 관계 설정이라는 관심사로 분리되어 별도 클래스로 추출이 가능합니다.

 

public class DaoFactory {
    
    public UserDao userDao() {
        ConnectionMaker connectionMaker = () -> {
            Class.forName("com.mysql.jdbc.Driver");
            Connection c = DriverManager.getConnection("jdbc:mysql://localhost/springbook", "spring", "book");
            return c;
        };
        UserDao userDao = new UserDao(connectionMaker);
        return userDao;
    }
}

 

DaoFactory를 분리했을 때 얻을 수 있는 장점은 아래와 같습니다.

 

어플리케이션의 컴포넌트 역할을 하는 오브젝트와 어플리케이션의 구조를 결정하는 오브젝트를 분리.

 

이러한, 객체간의 관계를 제 3의 클라이언트 혹은 Factory 클래스에게 위임하는것이 바로 객체지향에서의 Ioc(제어의 역전)입니다.

 

제어의 역전에서는 아래와 같은 기능들을 제공해야 합니다.

 

  • 어플리케이션 컴포넌트의 생성과 관계설정
  • 컴포넌트 사용
  • 컴포넌트 생명주기 관리

 

6. 스프링의 IoC

 

이제 Ioc 역할인 DaoFactory를 스프링에서 사용하는 방식으로 변경하도록 하겠습니다.

 

스프링에서는 스프링이 제어권을 가지고 직접 만들고 관계를 부여하는 오브젝트를 빈(bean)이라고 부릅니다.

또한, 빈은 스프링 컨테이너가 생성과 관계설정, 사용 등을 제어해주는 제어의 역전이 적용된 오브젝트를 가리키는 말입니다.

 

스프링에서는 DaoFactory와 같이 IoC 오브젝트를 빈 팩토리라고 부르며, 동시에 어플리케이션 컨테스트라고도 일컫습니다.

실제 코드상에서는 ApplicationContext는 BeanFactory의 서브 인터페이스입니다.

 

이제 위 DaoFactory의 기능을 스프링으로 변경하겠습니다.

 

@Configuration
public class DaoFactory {

    @Bean
    public UserDao userDao() {
        UserDao userDao = new UserDao(connectionMaker());
        return userDao;
    }
    
    @Bean
    public ConnectionMaker connectionMaker() {
        return () -> {
            Class.forName("com.mysql.jdbc.Driver");
            Connection c = DriverManager.getConnection("jdbc:mysql://localhost/springbook", "spring", "book");
            return c;
        };
    }
}

 

위 코드의 @Configuration과 @Bean은 아래와 같습니다.

 

  1. @Configuration은 DaoFactory 클래스가 오브젝트 설정을 담당하는 클래스라는 것을 스프링이 인식할 수 있도록 하는 설정 표시.
  2. @Bean은 오브젝트 생성을 담당하는 IoC용 메소드라는 표시

 

이제 DaoFactory를 설정정보로 사용하는 어플리케이션 컨텍스트는 아래와 같습니다.

 

public static void main(String[] args) {
    ApplicationContext context = new AnnotationConfigApplicationContext(DaoFactory.class);
    UserDao userDao = context.getBean("userDao", UserDao.class);
    ...
}

 

DaoFactory라는 설정정보를 담고있는 클래스를 통하여 어플리케이션 컨텍스트를 생성하였습니다.

이러한, 어플리케이션 컨텍스트는 getBean 메서드를 통해 설정정보 클래스에 정의한 오브젝트를 가져올 수 있습니다.

 

getBean 메서드에서 사용한 "userDao"는 가져올 빈 이름을 의미합니다.

이 이름은 DaoFactory에서 @Bean 어노테이션을 붙인 메서드 명이 자동적으로 UserDao 빈의 이름으로 등록되어 집니다.

 

굳이 이름을 통하여 빈을 가져와야 하는 이유는, 스프링에서는 동일 타입의 인스턴스들을 빈으로 등록할 수 있기 때문입니다.

 

 

어플리케이션 컨텍스트는 DaoFactory와는 다르게 어플리케이션 전체에 IoC를 적용하여 모든 오브젝트에 대한 생성과 관계설정을 담당하는 것을 알았습니다.

 

이러한 스프링의 어플리케이션 컨텍스트 장점은 아래와 같습니다.

 

  1. 클라이언트는 구체적인 팩토리 클래스를 알 필요가 없습니다.
  2. 어플리케이션 컨텍스트는 종합 IoC 서비스를 제공해줍니다.
  3. 어플리케이션 컨텍스트는 빈을 검색하는 다양한 방법을 제공합니다.

 

7. 싱글톤 레지스트리와 오브젝트 스코프

 

스프링 어플리케이션 컨텍스트는 bean을 저장하고 관리하는 IoC 컨테이너라고 했습니다.

이때, bean은 싱글톤으로서 하나의 객체를 생성하여 bean으로 등록하여 재사용합니다.

 

싱글톤으로 만들어 사용하는 이유로는 스프링이 서버에서 주로 사용되는 프레임워크이기 때문입니다.

계속적인 객체가 생성되어 리소스 낭비가 심해지는것을 방지하기 위해서 입니다.

 

자바 디자인 패턴 중 싱글톤 패턴과는 한개의 객체를 사용하도록 한다는 개념은 같지만 약간 사용성이 다릅니다.

 

자바 싱글톤 패턴은 아래와 같습니다.

 

public class UserDao {

    private static UserDao INSTANCE;

    private UserDao() {
    }

    public static synchronized UserDao getInstance() {
        if (INSTANCE == null) INSTANCE = new UserDao();
        return INSTANCE;
    }
}

 

스프링의 bean의 싱글톤은 아래와 같이 정의할 수 있습니다.

 

public class UserDao {

    private static UserDao INSTANCE;

    public UserDao() {
    }

    public static UserDao getInstance() {
        if (INSTANCE == null) INSTANCE = new UserDao();
        return INSTANCE;
    }
}

 

차이점으로는 아래와 같습니다.

 

  1. 생성자 = bean의 경우 private 생성자가 아니기 때문에, 객체 생성에 대한 제약이 있지 않습니다.
  2. 동기화 코드 = bean의 경우 synchronized 선언이 되어 있지 않습니다. 이는 synchronized로 인해 발생하는 성능이슈를 방지하기 위함입니다.

 

차이점에서 두번째인 동기화 코드로 인해 bean 사용시 유의해야 할 점이 있습니다.

그건 바로, stateful 방식의 bean이 아닌 stateless 방식으로 빈을 생성해야 한다는 것입니다.

 

이유는 stateful 방식으로 bean을 사용하면 멀티쓰레드 환경에서 동시접근으로 인해 정확한 상태를 가질 수 없기 때문입니다.

 

 

8. 의존관계 주입(DI)

스프링에서 제공하는 또 하나의 큰 기능을 말하라고 한다면 DI(Dependency Injection) 를 말할 수 있습니다.

 

DI는 말 그대로 의존관계 주입으로 위에서 DaoFactory가 UserDao와 ConnectionMaker간의 의존성 관계를 주입하는 것을 의미합니다.

 

의존관계에는 방향성이 존재합니다.

 

위 UserDao와 ConnectionMaker의 의존관계를 표현한다면 UserDao -> ConnectionMaker로 표현할 수 있으며,

UserDao는 ConnectionMaker에 의존하고 있다고 말할 수 있습니다.

 

즉, 이 경우 UserDao는 ConnectionMaker에 의존하고 있기 때문에, ConnectionMaker 변경에 영향을 받습니다.

하지만, ConnectionMaker는 UserDao 에 의존하고 있지 않기 때문에 UserDao 변경에 영향을 받지 않습니다.

 

스프링에서는 의존관계 주입을 3가지의 방법을 통해 정의할 수 있습니다.

 

  1. 어노테이션 - @Autowired를 사용
  2. 생성자 주입
  3. setter 주입

 

위의 경우 UserDao는 생성자에 ConnectionMaker를 받고 있기 때문에 생성자 주입방식을 사용한 것을 알 수 있습니다.

 

스프링 Document에서는 DI 방식으로 생성자 주입을 권장하고 있습니다.
다만, 의존관계가 많은 클래스의 경우 생성자 코드가 커져 장황한 코드가 나올 수 있는 단점이 있습니다.
이는 lombok 과 같은 써드파티 라이브러리를 사용한다면 해결 가능합니다.

 

추가로 스프링은 DL(Dependency Lookup) 도 제공하고 있습니다.

DL은 의존관계 검색으로서 위에서 살펴본 getBean을 DL이라고 볼 수 있습니다.

 

DI는 객체간의 의존관계를 주입하기 위해 제어권을 스프링에게 넘겨야 하며, DI의 관련 객체들은 모두 bean으로 스프링 컨테이너에 등록되어야 합니다.

 

하지만 DL은 스프링 컨테이너에 등록된 bean을 검색하는 용도로,

DL을 사용하는 클래스는 bean으로 등록될 필요가 없다는 차이점을 가지고 있습니다.

 

9. 마무리

이번 포스팅에서는 오브젝트와 의존관계에 대해 간단한 소개와 설명을 했습니다.

다음에는 테스트에 대해 포스팅하겠습니다.

반응형

'Framework > Spring' 카테고리의 다른 글

(2) 테스트  (2) 2020.06.21
반응형

1. 서론

이번 포스팅에서는 Hbase가 제공하는 클라이언트 API : 관리 기능에 대해 알아보겠습니다.

 

Hbase는 데이터를 조작하는 API 뿐 만이 아닌 데이터 정의를 위한 클라이언트 API도 제공하고 있습니다.

 

2. 스키마 정의

Hbase에서는 테이블 생성 시 테이블 및 컬럼패밀리의 스키마를 정의해야 합니다.

 

1) 테이블

 

Hbase는 데이터를 저장하는 가장 상위의 개념으로 테이블을 사용합니다.

이런, 테이블의 특성을 클라이언트에서는 아래와 같은 지시자 클래스로 제공합니다.

 

TableDescriptorBuilder.newBuilder(TableName tableName).build();
TableDescriptorBuilder.newBuilder(TableDescriptor tableDescriptor).build();

TableName tableName = TableName.valueOf("tableName");
TableDescriptor tableDescriptor = new ModifyableTableDescriptor(tableName);

 

Hbase 의 테이블 명은 hdfs의 실제 디렉터리로 생성이되기 때문에 파일명 규칙에 따라 만들어야 합니다.

 

Hbase의 테이블은 논리적인 개념이며, 물리적으로는 여러개의 리전으로 분리가 되며 각 리전은 Hbase Region Server에 존재하게 됩니다.

 

아래는 위 설명을 간략하게 나타낸 그림입니다.

 

 

2) 테이블 속성

 

테이블 지시자 클래스에는 속성 설정을 위한 Getter, Setter를 제공합니다.

 

이름

 

아래와 같이 테이블 이름을 가져올 수 있습니다.

 

byte[] tableName1 = tableDescriptor.getTableName().getName();
String tableName2 = tableDescriptor.getTableName().getNameAsString();

 

수정의 경우 마이너 버전에서는 setName으로 가능했으나 현재는 테이블을 새로 생성하여 migration하는 방법을 권장하고 있습니다.

 

컬럼패밀리

 

테이블 정의 시 가장 중요한 부분은 컬럼패밀리입니다.

 

컬럼패밀리의 경우 아래와 같은 API를 통해 조작이 가능합니다.

 

tableDescriptor.getColumnFamilies();
tableDescriptor.getColumnFamily(byte[] column);
tableDescriptor.hasColumnFamily(byte[] column);
Admin admin = connection.getAdmin();
admin.addColumnFamily();
admin.deleteColumnFamily();

 

파일 최대 크기

 

테이블내의 리전이 커질 수 있는 최대 파일 크기 또한 조작이 가능합니다.

 

API 명세는 아래와 같습니다.

 

tableDescriptor.getMaxFileSize();
((ModifyableTableDescriptor) tableDescriptor).setMaxFileSize(1024000);

 

최대 크기 설정은 리전 크기가 해당 값에 도달했을때 시스템이 리전을 분할하는 기준이 됩니다.

 

 

읽기 전용

 

쓰기가 아닌 일기 전용 테이블을 만들어야 하는 경우 아래와 같은 API를 사용하면 됩니다.

 

tableDescriptor.isReadOnly();
((ModifyableTableDescriptor) tableDescriptor).setReadOnly(true);

 

 

 

멤스토어 플러시 크기

 

아래는 멤스토어에 있는 데이터를 HFile로 write하는 트리거링을 바이트 단위로 조작하고 싶은 경우 사용하는 API 입니다.

 

tableDescriptor.getMemStoreFlushSize();
((ModifyableTableDescriptor) tableDescriptor).setMemStoreFlushSize(102400);

 

 

3) 컬럼패밀리

 

TableDescriptor 와 같이 컬럼패밀리에 대한 설정을 담고 있는 클래스는 ColumnFamilyDescriptor 입니다.

 

컬럼패밀리 또한 저장소 계층의 디렉터리 이름으로 사용되기 때문에 파일명 규칙을 따라 생성해야 합니다.

 

아래는 컬럼패밀리 지시자 클래스 정의입니다.

 

ColumnFamilyDescriptor columnFamilyDescriptor1 = 
	ColumnFamilyDescriptorBuilder.newBuilder(Bytes.toBytes("")).build();
ColumnFamilyDescriptor columnFamilyDescriptor2 = 
	ColumnFamilyDescriptorBuilder.newBuilder(new ColumnFamilyDescriptorBuilder.ModifyableColumnFamilyDescriptor(Bytes.toBytes(""))).build();

 

이름

 

아래와 같이 이름을 읽을 수 있습니다.

 

byte[] name1 = columnFamilyDescriptor.getName();
String name2 = columnFamilyDescriptor.getNameAsString();

 

Setter를 통해 이름을 설정할 수 는 없습니다.

 

최대 버전 갯수

 

컬럼패밀리 별로 값의 버전을 몇 개까지 보유할 지 지정할 수 있습니다.

 

columnFamilyDescriptor.getMaxVersions();
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setMaxVersions(100);

 

압축

 

컬럼패밀리에 저장된 데이터에 특정 압축기법을 적용할 수 있습니다.

 

Algorithm algorithm1 = columnFamilyDescriptor.getCompactionCompressionType();
Algorithm algorithm2 = columnFamilyDescriptor.getCompressionType();
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setCompactionCompressionType(Algorithm.GZ);
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setCompressionType(Algorithm.GZ);

 

Algorithm 은 Hbase 클라이언트에서 제공하는 enum으로 아래 값들이 있습니다.

 

  • LZO
  • GZ
  • NONE
  • SNAPPY
  • LZ4
  • BZIP2
  • ZSTD
기본은 NONE으로 압축하지 않습니다.

 

블록크기

 

Hbase는 컬럼패밀리별로 HFile을 저장하게 됩니다.

그렇기 때문에, HFile의 블록 크기를 컬럼패밀리 지시자를 통해 조작이 가능합니다.

 

columnFamilyDescriptor.getBlocksize();
((ModifyableColumnFamilyDescriptor)columnFamilyDescriptor).setBlocksize(1024);

 

기본 HFile의 블록크기는 64KB로 기본 HDFS의 블록 크기인 64MB에 비교하여 1/1024 의 크기입니다.

 

블록 캐시

 

Hbase는 I/O 자원을 효율적으로 사용하기 위해 scan 연산 시 모든 블록을 읽어 메모리에 상주시켜놓고 재사용하게 합니다.

메모리에 올렸기 때문에 디스크에는 다시 접근하지 않아 I/O를 절약합니다.

 

columnFamilyDescriptor.isBlockCacheEnabled();
((ModifyableColumnFamilyDescriptor) columnFamilyDescriptor).setBlockCacheEnabled(false);

 

유효기간

 

Hbase 에서는 컬럼패밀리에 속한 데이터의 버전 뿐만이 아닌 유효기간도 설정도 가능합니다.

 

기본은 영원히 저장하도록 되어 있으며, 유효기간을 지정하게 되면 주 컴팩션시  유효기간이 지난 데이터들을 찾아 삭제합니다.

 

columnFamilyDescriptor.getTimeToLive();
((ModifyableColumnFamilyDescriptor) columnFamilyDescriptor).setTimeToLive(1000);

 

인메모리

 

위 블록캐시에서 말한 메모리에 대한 설정도 가능합니다.

 

columnFamilyDescriptor.isInMemory();
((ModifyableColumnFamilyDescriptor) columnFamilyDescriptor).setInMemory(true);

 

 

반응형

 

 

3. HBaseAdmin

Hbase에는 RDBMS의 DDL과 흡사한 기능을 제공하는 HBaseAdmin 클래스를 제공합니다.

 

HBaseAdmin 인스턴스는 아래와 같이 가져올 수 있습니다.

 

Configuration conf = HBaseConfiguration.create();
conf.set("hbase.zookeeper.quorum", "");

Connection connection = ConnectionFactory.createConnection(conf);
HBaseAdmin hBaseAdmin = (HBaseAdmin) connection.getAdmin();

 

1) 기본 기능

 

HBaseAdmin은 아래와 같은 기본 기능을 제공합니다.

 

1. getConnection()

 

연결 인스턴스를 반환하는 메소드입니다.

 

2. getConfiguration()

 

HBaseAdmin 인스턴스를 생성할 때 사용된 설정 인스턴스를 반환합니다.

 

3. close()

 

HBaseAdmin 인스턴스가 점유하고 있는 모든 자원을 해제합니다.

 

 

2) 테이블 관련 기능

 

HBaseAdmin은 DDL과 같이 테이블 생성을 위한 메서드를 제공합니다.

 

1. 생성

 

void createTable(TableDescriptor desc) throws IOException;
void createTable(TableDescriptor desc, byte[] startKey, byte[] endKey, int numRegions) throws IOException;
void createTable(final TableDescriptor desc, byte [][] splitKeys);

 

메서드 2번째를 보시면 테이블 생성 시 row key의 시작키와 끝키를 지정하여 특정 리전에 존재하는 테이블을 생성시킬 수 있습니다.

이때, 시작키는 끝키보다 작아야하며, 기본적으로 Hbase 의 리전은 시작키를 포함하고 끝키는 포함하지 않습니다.

numResions는 테이블이 확보해야하는 최소 리전 수를 의미하며
최소 3 이상이어야 합니다. 

 

아래는 테이블 생성하는 간단한 예제입니다.

 

Connection connection = ConnectionFactory.createConnection(HBaseConfiguration.create());
HBaseAdmin hBaseAdmin = (HBaseAdmin) connection.getAdmin();
TableDescriptor desc = TableDescriptorBuilder.newBuilder(TableName.valueOf("testTable")).build();
hBaseAdmin.createTable(desc);

 

 

2. 조회

 

테이블이 존재하는지, 혹은 지정한 테이블의 스키마 정의를 아래와 같은 메서드를 통해 가져올 수 있습니다.

 

boolean tableExists(final TableName tableName) throws IOException;
HTableDescriptor[] listTables() throws IOException;
HTableDescriptor[] listTables(Pattern pattern) throws IOException;
HTableDescriptor[] listTables(String regex) throws IOException;
HTableDescriptor getTableDescriptor(final TableName tableName) throws IOException;

 

3. 삭제

 

테이블 삭제는 아래 메서드를 사용하시면 됩니다.

 

void deleteTable(final TableName tableName) throws IOException;

 

4. 비활성

 

HBase에서는 테이블 삭제 전 필수로 비활성을 시켜야 합니다.

 

비활성 메서드는 아래와 같습니다.

 

void disableTable(final TableName tableName) throws IOException;

 

비활성화 메서드는 아래와 같은 동작을 HBase에게 수행하도록 합니다.

 

  1. 모든 리전 서버에서 아직 적용하지 않은 변경사항 처리
  2. 모든 리전 닫음
  3. 해당 테이블의 모든 리전이 어떤 서버에도 배치되어 있지 않다는 정보를 메타( .META. ) 테이블에 기록

 

5. 활성화

 

HBaseAdmin은 테이블의 활성화 메서드도 제공하고 있습니다.

 

void enableTable(final TableName tableName) throws IOException;

 

6. 확인

 

지정 테이블이 활성화되어 있는지, 비활성화 되어 있는지 등의 상태를 확인하는 메서드는 아래와 같습니다.

 

boolean isTableEnabled(final TableName tableName) throws IOException;
boolean isTableDisabled(TableName tableName) throws IOException;
boolean isTableAvailable(TableName tableName) throws IOException;

 

isTableAvailable 메서드의 경우에는 테이블의 활성화 상태가 아닌 단지 존재하는지를 체크하는 메서드입니다.

 

7. 변경

 

테이블에 대한 정의를 변경도 가능합니다.

 

void modifyTable(TableDescriptor td) throws IOException;

 

단, 변경도 삭제와 동일하게 먼저 테이블을 비활성화 시킨 후 수행하여야 합니다.

 

 

3) 스키마 관련 기능

 

HBaseAdmin은 테이블의 컬럼 패밀리에 대한 정의도 변경 가능합니다.

 

void addColumnFamily(final TableName tableName, final ColumnFamilyDescriptor columnFamily) throws IOException;
void deleteColumnFamily(final TableName tableName, final byte[] columnFamily) throws IOException;
void modifyColumnFamily(final TableName tableName, final ColumnFamilyDescriptor columnFamily) throws IOException;

 

4) 클러스터 관련 기능

 

HBaseAdmin 클래스는 클러스터에 관련된 정보를 확인 및 조작할 수 있는 메서드를 제공합니다.

 

1. void flush(final TableName tableName) throws IOException

 

멤스토어에 있는 정보들을 강제로 disk write 하도록 수행하는 메서드입니다.

 

2. void compact(final TableName tableName) throws IOException

 

지정한 테이블을 컴팩션 대기열에 넣는 메서드입니다.

 

3. void majorCompact(final TableName tableName) throws IOException

 

지정 테이블에 대해 주 컴팩션이 일어나도록 트리거링하는 메서드입니다.

 

4. void split(final TableName tableName) throws IOException

 

지정 테이블에 대해 분할 작업을 수행시키는 메서드입니다.

지정 테이블의 모든 리전을 이터레이트하여 분할 작업을 자동적으로 호출합니다.

 

5. void assign(final byte [] regionName) throws

MasterNotRunningException, ZooKeeperConnectionException, IOException

 

리전을 할당할 때 사용하는 메서드입니다.

 

6. void unassign(final byte [] regionName, final boolean force) throws IOException

 

assign과 반대로 리전을 해제할 때 사용하는 메서드입니다.

 

7. void move(final byte[] encodedRegionName, ServerName destServerName) throws IOException

 

지정 리전을 특정 서버로 이동시키는 메서드입니다.

 

즉, 클라이언트는 능동적으로 특정 리전을 현재 배치된 서버가 아닌 다른 서버로 옮길 수 있습니다.

 

8. boolean balancerSwitch(final boolean on, final boolean synchronous) throws IOException

 

리전 밸런서를 키거나 끄는 메서드 입니다.

리번 밸런서랑 해당 리전이 지정 크기보다 커지는 경우 분할 시켜 다른 리전 서버로 보내는 역할을 합니다.

 

9. synchronized void shutdown() throws IOException

 

클러스터를 중단할 때 사용합니다.

 

10. synchronized void stopMaster() throws IOException

 

마스터 서버를 중단할 때 사용합니다.

 

11. synchronized void stopRegionServer(final String hostnamePort) throws IOException

 

특정 리전 서버만을 중단 할 때 사용합니다.

 

4. 마무리

이번 포스팅에서는 클라이언트 API : 관리 기능에 대해 진행하였습니다.

 

다음 포스팅에서는 챕터 6장인 클라이언트 종류에 대해 진행하겠습니다.

 

반응형

'BigData > Hbase' 카테고리의 다른 글

(4) 클라이언트 API : 고급 기능  (0) 2020.04.19
(3) 클라이언트 API : 기본 기능  (0) 2020.04.09
(2) 설치  (0) 2020.04.08
(1) 소개  (0) 2020.04.07
반응형

1. 서론

이번 포스팅에서는 Chapter17의 리액티브 프로그래밍에 대해 진행하도록 하겠습니다.

 

2. 리액티브 매니패스토

리액티브 매니패스토란 리액티브 어플리케이션과 시스템 개발의 핵심 원칙을 공식적으로 정의한 내용입니다.

 

  1. 반응성 : 리액티브 시스템은 빠를 뿐 아니라 더 중요한 특징으로 일정하고 예상할 수 있는 반응 시간을 제공합니다.
  2. 회복성 : 장애가 발생해도 시스템은 반응해야 합니다.
  3. 탄력성 : 어플리케이션의 생명주기 동안 다양한 작업 부하를 받게 되는데 이 다양한 작업 부하로 어플리케이션의 반응성이 위협받을 수 있습니다. 이를 대비해, 자동으로 할당 된 자원 수를 늘립니다.
  4. 메시지 주도 : 회복성과 탄력성을 위해 메시지 기반의 통신으로 이루어 지도록 합니다.

아래는 위 4가지가 어떤 관계로 얽혀있는지 보여줍니다.

 

 

3. 리액티브 스트림과 플로 API

리액티브 프로그래밍은 리액티브 스트림을 사용하는 프로그래밍입니다.

 

리액티브 스트림은 무한의 비동기 데이터를 순서대로 블록하지 않은 역압력을 전제하여 처리하는 기술입니다.

스트림의 비동기가 기반이기 때문에 역압력 기술은 필수입니다.

 

역압력이란, 데이터 소모하는 쪽에서 데이터 발행자에게 요청한 경우에만 처리하도록 하는 기법입니다.

 

 

1) Flow 클래스 소개

 

자바 9 에서는 java.util.concurrent.Flow를 추가했습니다.

 

Flow 클래스는 내부적으로 4개의 인터페이스를 가지고 있습니다.

 

  • Publisher
  • Subscriber
  • Subscription
  • Processor

아래는 각 4개의 인터페이스 정의입니다.

 

@FunctionalInterface
public static interface Publisher<T> {
    public void subscribe(Subscriber<? super T> subscriber);
}

public static interface Subscriber<T> {
    public void onSubscribe(Subscription subscription);
    public void onNext(T item);
    public void onError(Throwable throwable);
    public void onComplete();
}

public static interface Subscription {
    public void request(long n);
    public void cancel();
}

public static interface Processor<T,R> extends Subscriber<T>, Publisher<R> {

}

 

여기서 Processor는 리액티브 스트림에서 처리하는 이벤트의 변환을 담당합니다.

 

 

아래는 이 4개의 인터페이스의 동작 원칙입니다.

 

  • Publisher는 반드시 Subscription의 request 메서드에 정의된 개수 이하의 요소만 Subscriber에 전달해야 합니다.
  • Subscriber 는 요소를 받아 처리할 수 있음을 Publisher에게 알려야 합니다.
  • Publisher 와 Subscriber 는 Subscription을 공유해야 합니다.

 

 

1) 첫번째 리액티브 어플리케이션 만들기

 

아래는 매초 온도를 보고하는 예제입니다.

 

public class TempInfo {

    public static final Random random = new Random();

    private final String town;
    private final int temp;

    public TempInfo(String town, int temp) {
        this.town = town;
        this.temp = temp;
    }
    
    public static TempInfo fetch(String town) {
        if(random.nextInt(10) == 0) {
            throw new RuntimeException("ERROR!!");
        }
        return new TempInfo(town, random.nextInt(100));
    }
}

 

public class TempSubscriber implements Flow.Subscriber<TempInfo> {

    private Flow.Subscription subscription;

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        this.subscription = subscription;
        subscription.request(1);
    }

    @Override
    public void onNext(TempInfo item) {
        System.out.println(item);
        subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        System.err.println(throwable.getMessage());
    }

    @Override
    public void onComplete() {
        System.out.println("Done!");
    }
}

 

public class TempSubscription implements Flow.Subscription {

    private final Flow.Subscriber<? super TempInfo> subscriber;
    private final String town;

    public TempSubscription(Flow.Subscriber<? super TempInfo> subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        for(long i = 0L; i < n; i++) {
            try {
                subscriber.onNext(TempInfo.fetch(town));
            } catch (Exception e) {
                subscriber.onError(e);
                break;
            }
        }
    }

    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}

 

public class Main {

    public static void main(String[] args) {
        getTemperatures("New York").subscribe(new TempSubscriber());
    }

    private static Flow.Publisher<TempInfo> getTemperatures(String town) {
        return subscriber -> subscriber.onSubscribe(
                new TempSubscription(subscriber, town)
        );
    }
}

 

 

위 예제에서는 Subscription의 request를 호출하면, Subscriber가 Subscription을 또 호출하는 재귀 문제가 있습니다.

이를 해결하기 위해서는 별도 Executor를 사용할 수 있습니다.

 

Executor 소스를 추가한 코드는 아래와 같습니다.

 

public class TempSubscription implements Flow.Subscription {
    
    private static final ExecutorService executor = Executors.newSingleThreadExecutor(); 
    private final Flow.Subscriber<? super TempInfo> subscriber;
    private final String town;

    public TempSubscription(Flow.Subscriber<? super TempInfo> subscriber, String town) {
        this.subscriber = subscriber;
        this.town = town;
    }

    @Override
    public void request(long n) {
        executor.submit(() -> {
            for(long i = 0L; i < n; i++) {
                try {
                    subscriber.onNext(TempInfo.fetch(town));
                } catch (Exception e) {
                    subscriber.onError(e);
                    break;
                }
            }    
        });
    }

    @Override
    public void cancel() {
        subscriber.onComplete();
    }
}

 

 

 

2) Processor로 데이터 변환하기

 

Processor의 목적은 Publisher를 구독한 다음 수신한 데이터를 가공해 다시 제공하는 것입니다.

 

아래는 위 예제에 Processor를 적용한 코드입니다.

 

public class TempSubscription implements Flow.Subscription {

    private static final ExecutorService executor = Executors.newSingleThreadExecutor();
    private final Flow.Processor<? super TempInfo, ? super TempInfo> processor;
    private final String town;

    public TempSubscription(Flow.Processor<? super TempInfo, ? super TempInfo> processor, String town) {
        this.processor = processor;
        this.town = town;
    }

    @Override
    public void request(long n) {
        executor.submit(() -> {
            for(long i = 0L; i < n; i++) {
                try {
                    processor.onNext(TempInfo.fetch(town));
                } catch (Exception e) {
                    processor.onError(e);
                    break;
                }
            }
        });
    }

    @Override
    public void cancel() {
        processor.onComplete();
    }
}

 

public class TempProcessor implements Flow.Processor<TempInfo, TempInfo> {

    private Flow.Subscriber<? super TempInfo> subscriber;

    @Override
    public void subscribe(Flow.Subscriber<? super TempInfo> subscriber) {
        this.subscriber = subscriber;
    }

    @Override
    public void onSubscribe(Flow.Subscription subscription) {
        subscriber.onSubscribe(subscription);
    }

    @Override
    public void onNext(TempInfo item) {
        subscriber.onNext(
                new TempInfo(
                        item.getTown(),
                        (item.getTemp() -32) * 5 /9
                )
        );

    }

    @Override
    public void onError(Throwable throwable) {
        subscriber.onError(throwable);
    }

    @Override
    public void onComplete() {
        subscriber.onComplete();
    }
}

 

public class Main {

    public static void main(String[] args) {
        getTemperatures("New York").subscribe(new TempSubscriber());
    }

    private static Flow.Publisher<TempInfo> getTemperatures(String town) {
        return subscriber -> {
            TempProcessor processor = new TempProcessor();
            processor.subscribe(subscriber);
            processor.onSubscribe(new TempSubscription(processor, town));
        };
    }
}

 

Processor를 이용해 onNext 메서드에서 화씨 온도를 섭씨 온도로 변경하여 Subscriber 에게 전달하는 것을 볼 수 있습니다.

 

 

 

 

 

 

 

 

반응형

 

 

 

 

 

 

 

4. 리액티브 라이브러리 RxJava

RxJava는 자바로 리액티브 어플리케이션을 구현하는데 사용하는 라이브러리입니다.

compile group: 'io.reactivex.rxjava2', name: 'rxjava', version: '2.2.19'

 

RxJava는 io.reactivex 패키지하위의 Observable, Observer 를 사용할 수 있습니다.

 

간단히 자바 플로에서 Publisher 가 RxJava에서는 Observable, Subscriber가 Observer라고 보시면 됩니다.

 

 

1) Observable 만들고 사용하기

 

아래는 간단한 Observable 을 만드는 예제 코드입니다.

 

Observable<String> strings = Observable.just("fisrt", "second");

 

just 팩토리 메서드는 한개 이상의 요소를 이용해 방출하는 Observable 을 만듭니다.

 

Observable 구독자는 onNext("first"), onNext("second"), onComplete() 순으로 메시지를 받게 됩니다.

 

특정 시간단위로 상호작용할때는 아래와 같이 interval 팩토리 메서드를 사용할 수 있습니다.

 

Observable<Long> onePerSec = Observable.interval(1, TimeUnit.SECONDS);

 

추가로 RxJava는 역압력을 지원하지 않아, Observable에는 Subscription의 request 메서드 같은 기능이 없습니다.

 

Observer 인터페이스는 위의 Subscriber와 정의도 비슷합니다.

 

아래는 Observer 인터페이스 정의입니다.

 

public interface Observer<T> {
    void onSubscriber(Disposable d);
    void onNext(T t);
    void onCompleted();
    void onError(Throwable e);
}

 

RxJava는 위의 메서드를 모두 구현할 필요가 없고, onNext 만 구현해도 괜찮습니다.

 

예제는 아래와 같습니다.

 

onePerSec.subscribe(i -> System.out.println(TempInfo.fetch("New York")));

 

 

이 예제는 초마다 뉴욕의 온도를 출력하는 예제입니다.

 

하지만 실제로 동작시키면 아무런 출력이 되지 않습니다.

이유는, Observable이 RxJava의 연산 쓰레드 풀 즉 데몬 쓰레드에서 실행되기 때문입니다.

 

RxJava는 호출 쓰레드에서 값을 받는 메서드도 제공합니다.

 

예제는 아래와 같습니다.

 

onePerSec.blockingSubscribe(i -> System.out.println(TempInfo.fetch("New York")));

 

이제 위 예제에 RxJava를 적용해보겠습니다.

 

private static Observable<TempInfo> getTemperatures(String town) {
    return Observable.create(emitter -> {
        Observable.interval(1, TimeUnit.SECONDS)
                .subscribe(i -> {
                    if (!emitter.isDisposed()) {
                        if (i >= 5) {
                            emitter.onComplete();
                        } else {
                            try {
                                emitter.onNext(TempInfo.fetch(town));
                            } catch (Exception e) {
                                emitter.onError(e);
                            }

                        }
                    }
                });
    });
}

 

public class TempObserver implements Observer<TempInfo> {
    
    @Override
    public void onCompleted() {
        System.out.println("Done!");    
    }

    @Override
    public void onError(Throwable e) {
        System.out.println("Got problem : " + e.getMessage());

    }

    @Override
    public void onNext(TempInfo tempInfo) {
        System.out.println(tempInfo);
    }
}

 

public static void main(String[] args) {
    Observable<TempInfo> observable = getTemperatures("New York");
    observable.blockingSubscribe(new TempObserver());
}

 

 

2) Observable 변환하고 합치기

 

Observable에는 스트림과 비슷한 메서드들을 제공합니다.

 

  • map
  • filter
  • merge

 

map

 

map은 스트림의 map 과 같이 요소를 변환하는 메서드입니다.

자바 플로의 Processor라고 보시면 됩니다.

 

아래는 섭씨로 변환하는 작업을 Observable의 map을 사용하여 만드는 예제입니다.

 

private static Observable<TempInfo> getCelsiusTemperature(String town) {
    return getTemperatures(town).map(item -> new TempInfo(
            item.getTown(),
            (item.getTemp() -32) * 5 /9
    ));
}

 

filter

 

filter도 스트림과 동일하게 특정 조건의 데이터만을 추출하는 용도의 메서드입니다.

 

아래는 filter 예제 코드입니다.

 

private static Observable<TempInfo> getNegativeTemperature(String town) {
    return getCelsiusTemperature(town).filter(temp -> temp.getTemp() < 0);
}

 

merge

 

merge는 여러 Observable를 하나의 Observable로 만드는 메서드입니다.

 

아래는 merge 예제 코드입니다.

 

private static Observable<TempInfo> getCelsiusTemperatures(String... towns) {
    return Observable.merge(
            Arrays.asList(towns).stream()
            .map(Main::getCelsiusTemperature)
            .collect(Collectors.toList())
    );
}

 

public static void main(String[] args) {
    Observable<TempInfo> observable = getCelsiusTemperatures("New York", "Chicago", "San Francisco");
    observable.blockingSubscribe(new TempObserver());
}

 

merge 메서드는 Observable의 Iterator을 인수로 받아 한개의 Observable처럼 동작합니다.

 

5. 마무리

이번 포스팅에서는 Chapter17 리액티브 프로그래밍 대해 진행하였습니다.

이렇게, 모던 자바 인 액션에 대한 포스팅은 완료했습니다.

반응형

+ Recent posts