본문 바로가기
공부/java

IO 와 NIO 를 이용한 입출력 (2/2)

by 샤샤샤샤 2024. 5. 17.

 NIO

자바 4에서 새롭게 등장하고 7에서 재정비된 NIO 는 IO 와 비교해 여러 강점을 가진다.

 

1. MMIO(Merory-Mapped IO) 방식으로 버퍼 지원

2. 넌블로킹 모드 지원

3. Scatter/Gather 지원

4. 파일락

 

이전 포스팅에 1,2,3, 의 내용이 정리되어 있다. 파일락의 경우, 어떤 파일을 수정하는 동안 다른 쓰레드에서 읽기 작업을 가능하게 할지 설정하는 기술이다.

 

MMIO

일반적으로 I/O 작업이 이뤄질때, 이를 위한 별도의 메모리 공간이 할당된다. 문제는 I/O 작업이 일어날 때마다 이 별도의 메모리 공간(자바에서는 Heap)을 사용하기 위한 시스템콜이 일어나고, 이 작업이 상당한 자원을 소모한다는 것이다.

이 문제를 해결하기 위해 운영체제는 MMIO 라는 기술을 지원한다.

mmio 는 I/O 데이터를 별도의 메모리가 아닌, 네이티브 메모리 공간에 직접 저장하는 방식이다. 
기존의 자바 방식을 통해 A라는 파일에 접근한다고 하면 아래와 같은 과정을 거친다.

 

1. 자바 내부적으로 A파일 접근 (InputStream)

2. 자바 heap 에 생성된 stream 을 통해 데이터 접근 요청

3. 시스템 콜이 일어나면서 운영체제에서 요청한 데이터를 찾아와 stream 으로 전달해줌

4. 2부터 반복

 

즉, 파일에 접근할때 계속해서 시스템 콜이 일어난다.

반면 mmio 는 네이티브 메모리에 직접 접근이 가능한 방식이다. 따라서 처음 한번 데이터를 메모리에 적재하는 작업 이후에는 시스템 콜이 일어나지 않는다(메모리에 한번에 못담길 정도로 크다면 여러번 일어날수도 있음). 그 과정은 아래와 같다.

 

1. 자바 내부적으로 A 파일 접근 (FileChannel)

2. 자바 heap 에 생성된 FileChannel의 map() 함수를 이용해 데이터를 메모리에 적재 요청

3. 시스템 콜이 일어나고, 네이티브 메모리에 데이터가 저장됨.

4. 이후 데이터 접근시, HDD나 SSD 같은 저장장치가 아닌 네이티브 메모리에 접근하기 때문에 시스템 콜이 없음

 

결론적으로 MMIO 는 자주 i/o 가 일어나는 작업에서 빠른 속도를 보장한다. 다만 i/o 가 적게 일어난다면 과거 방식이 더 빠를수도 있다.

 

코드 예시1. mmio 를 통한 파일 읽기

    private static final int BUFFER_SIZE = 128;
  

    public static void main(String[] args) {
        long time = measureRunTime(MemoryMappedIo::mmioTest);
        System.out.println("mmio 를 통한 파일 읽기 속도: " + time + " millisecond");
    }
    
    
    public static long measureRunTime(Runnable task) {
        long startTime = System.currentTimeMillis();
        task.run();
        long endTime = System.currentTimeMillis();
        return endTime - startTime;
    }
    
    public static void mmioTest() {
        Path path = Paths.get("C:\\Users\\Hojun\\Desktop\\git\\JavaStudy\\IOAndNIO\\CopyDummy.txt");
        File file1 = path.toFile();

        long size;
        try {
            size = Files.size(path);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }

        int bufferCount = (int) (size / BUFFER_SIZE + 1);
        size = bufferCount * BUFFER_SIZE;

        try (RandomAccessFile fileData = new RandomAccessFile(file1, "rw")) {
            fileData.setLength(size);
            ByteBuffer buffer = fileData.getChannel()
                    .map(FileChannel.MapMode.READ_WRITE, 0L, size);
            buffer.flip();
            while (buffer.hasRemaining()) {
                byte b = buffer.get();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

 

2. mmio를 통한 파일 전송

    public static void main(String[] args) {
        long time = measureRunTime(MemoryMappedIo::mmioTest);
        System.out.println("mmio 를 통한 파일 읽기 속도: " + time + " millisecond");

        long time2 = measureRunTime(MemoryMappedIo::mmioTansferTest);
        System.out.println("mmio 를 통한 파일 전송 속도: " + time2 + " millisecond");

    }
    
      public static void mmioTansferTest() {
        Path path = Paths.get("C:\\Users\\Hojun\\Desktop\\git\\JavaStudy\\IOAndNIO\\CopyDummy.txt");
        Path path2 = Paths.get("C:\\Users\\Hojun\\Desktop\\git\\JavaStudy\\IOAndNIO\\testDummy.txt");

        File file1 = path.toFile();
        File file2 = path2.toFile();

        FileOutputStream fileOutputStream;
        try {
            fileOutputStream = new FileOutputStream(file2);

        } catch (FileNotFoundException e) {
            throw new RuntimeException(e);
        }
        FileChannel fileOutputChannel = fileOutputStream.getChannel();

        long size;
        try {
            size = Files.size(path);
        } catch (IOException e) {
            throw new RuntimeException(e);
        }


        int bufferCount = (int) (size / BUFFER_SIZE + 1);
        size = bufferCount * BUFFER_SIZE;

        try (RandomAccessFile fileData = new RandomAccessFile(file1, "rw")) {
            fileData.setLength(size);
            FileChannel channel = fileData.getChannel();
            ByteBuffer originalData = fileData.getChannel()
                    .map(FileChannel.MapMode.READ_WRITE, 0L, size);


            Charset charset = Charset.defaultCharset();
            ByteBuffer newData = charset.encode("new Data");


            ByteBuffer combine = ByteBuffer.allocate((int) size+100);
            combine.put(newData);
            combine.put(originalData);
            combine.position(0);
            channel.write(combine);

            channel.transferTo(0, size, fileOutputChannel);
            channel.close();

        } catch (IOException e) {
            e.printStackTrace();
        }
    }

결과

 

 

파일락

당연히 nio api 를 사용하니 channel 을 사용해야 한다.

먼저 파일락을 걸어두자.

    public static void main(String[] args) {
        File file = new File("C:\\Users\\Hojun\\Desktop\\git\\JavaStudy\\IOAndNIO\\Gather-Scatter.txt");

        try (FileChannel channel = new RandomAccessFile(file, "rw").getChannel()){
            // channel.lock(잠금 시작 위치, 잠금시킬 데이터 크기, 잠금되는 동안 다른 쓰레드에서 접근 가능 여부)
            try (FileLock lock = channel.lock(0, Long.MAX_VALUE, false)) {
                boolean isShared = lock.isShared();
                System.out.println("Is Shared Lock? : " + isShared);
                
                ByteBuffer buffer = ByteBuffer.allocate(1024); // 버퍼 생성
                int bytesRead = channel.read(buffer); // 파일 읽기
                
                System.out.println("Bytes read: " + bytesRead);
                buffer.flip(); // 버퍼를 읽기 모드로 전환
                byte[] bytes = new byte[buffer.remaining()];
                
                System.out.println("File content: " + new String(bytes)); // 파일 내용 출력
                lock.release();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

 

 

이 코드를 실행시키면  Gather-Scatter.txt 파일은 잠금에 걸려 다른 쓰레드에서 접근이 거부된다(lock 메서드에 false 대신 true 값을 주면 접근 가능해진다).
그러면 이제 해당 파일에 접근하는 다른 코드를 작성해보자.

 static class MyRunnable implements Runnable{
        @Override
        public void run() {
            File file = new File("C:\\Users\\Hojun\\Desktop\\git\\JavaStudy\\IOAndNIO\\Gather-Scatter.txt");
            try (FileChannel channel = new RandomAccessFile(file, "r").getChannel()){
                //  new RandomAccessFile(file, "r") : r - 읽기 모드. 만약 rw 라면 읽기 쓰기 둘다 가능한 모드임.

                ByteBuffer buffer = ByteBuffer.allocate(1024); // 버퍼 생성

                int bytesRead = channel.read(buffer); // 파일 읽기
                System.out.println("Bytes read2: " + bytesRead);

                buffer.flip(); // 버퍼를 읽기 모드로 전환

                byte[] bytes = new byte[buffer.remaining()];
                System.out.println("File content2: " + new String(bytes)); // 파일 내용 출력

                buffer.get(bytes); // 버퍼에서 데이터 읽기

            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

이 코드 역시 똑같은 파일에 접근해서 읽어오는 작업을 수행한다.
이제 이 코드를 다시 main 함수에서 새로운 쓰레드를 생성해 실행시켜 보자.

 

    public static void main(String[] args) {
        File file = new File("C:\\Users\\Hojun\\Desktop\\git\\JavaStudy\\IOAndNIO\\Gather-Scatter.txt");

        try (FileChannel channel = new RandomAccessFile(file, "rw").getChannel()){
            // channel.lock(잠금 시작 위치, 잠금시킬 데이터 크기, 잠금되는 동안 다른 쓰레드에서 접근 가능 여부)
            try (FileLock lock = channel.lock(0, Long.MAX_VALUE, false)) {
                boolean isShared = lock.isShared();
                System.out.println("Is Shared Lock? : " + isShared);
                
                // 추가! 새로운 쓰레드 실행
                Thread newThread = new Thread(new MyRunnable(), "Thread 1");
                newThread.start();
                //
                
                ByteBuffer buffer = ByteBuffer.allocate(1024); // 버퍼 생성
                int bytesRead = channel.read(buffer); // 파일 읽기
                
                System.out.println("Bytes read: " + bytesRead);
                buffer.flip(); // 버퍼를 읽기 모드로 전환
                byte[] bytes = new byte[buffer.remaining()];
                
                System.out.println("File content: " + new String(bytes)); // 파일 내용 출력
                lock.release();
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


이때 결과물은 다음과 같다.

이와 같이 현재 쓰레드에서 작업 중인 파일에 다른 쓰레드에서의 접근을 거부하는 기능이 파일락이다.

Scatter/Gather

만약 서로 다른 주소를 가르키고 있는 버퍼 a 와 버퍼 b 를 참조해 데이터를 읽는다고 생각해보자.
먼저 a를 읽어오기 위한 시스템 콜이 일어나고, 다시 b를 읽어오기 위한 시스템 콜이 일어난다. 이 작업을 동시에 처리하여 a, b 를 한번의 시스템 콜로 읽어오는 것이 바로 scatter 이다. gather 는 이와 반대로 한번에 쓰기 작업을 하는 것이다.
간단한 내용이기에 바로 Gather 작업 코드만 작성했다.

public class Gather {

    public static void main(String[] args) throws IOException {
        FileOutputStream fileOutputStream = new FileOutputStream(
                "C:\\Users\\Hojun\\Desktop\\git\\JavaStudy\\IOAndNIO\\Gather-Scatter.txt");

        FileChannel channel = fileOutputStream.getChannel();

        ByteBuffer a = ByteBuffer.allocate(128);
        ByteBuffer b = ByteBuffer.allocate(1024);
        ByteBuffer[] array = {a, b};

        a.put("Hello".getBytes(StandardCharsets.UTF_8));
        b.put("World".getBytes(StandardCharsets.UTF_8));

        a.flip();
        b.flip();

        channel.write(array);
        channel.close();


        // 이때 a와 b 버퍼 위치는 서로 다르지만, write 작업이 이뤄질때 한번에 접근함.
        // 즉, a 접근해서 데이터 가져오고 다시 b 접근해서 데이터를 가져오는 식의 두번의 오버헤드가 일어나지 않음
    }
}

 

 

Non-Blocking(논블록킹)

논블로킹이란 i/o 작업이 일어나는 동안 쓰레드가 멈추지 않을 의미한다.
과거의 i/o 방식은 i/o 가 일어나는 동안 쓰레드가 완전히 동작을 멈췄다. 따라서 웹과 같이 i/o 요청이 동시에 많이 들어오는 프로세스를 만들면 비동기적 처리를 하기 위해, 요청 하나당 쓰레드 하나를 생성해야 했다.

 

하지만 NIO 의 Selector를 이용하면 논블록킹으로 설정할수 있어 하나의 쓰레드 안에서 여러가지 요청을 처리할수 있게 되었다.

이를 위해서 NIO 의 셀렉터는 리액터 패턴의 리엑터 역할을 수행하게 된다. 리액터 패턴에 대한 자세한 내용은 여기서 다루지 않는다. 만약 스프링을 안다면 스프링에서 요청이 들어왔을때 처리하는 방식이라고 생각하면 된다.

 

셀렉터에는 크게 3가지 중요 개념이 존재한다.

 

1. 셀렉터

2. 셀렉션키

3. 셀렉터블 채널

 

셀렉션은 리엑터 패턴의 리엑터 역할을 한다. 즉, 새로운 요청이 들어올때 어떤 행동을 할지 정해주는 중계기와 같은 역할이다.

셀렉션 키는 행동을 구분하기 위해 사용한다. 키값은 연결 허락(OP_CONNECT ), 연결 완료(OP_ACCEPT), 요청 읽기(OP_READ)

, 응답 쓰기(OP_WRITE ) 로 나뉜다.

셀렉터블 채널은 연결되는 주체로 서버와 클라이언트같이 연결되는 대상이라고 생각하면 된다. 하지만 논블럭킹을 지원하는 채널이여야만 하기에 SelectableChannel  클래스를 상속받은 클래스만 셀렉터에 등록될수 있다.

 

셀렉터는 다음과 같이 구동된다.

 

1. 서버 역할을 할 SelectableChannel(또는 이를 상속받은) 을 생성한다2. 서버를 연결허락(OP_ACCEPT)상태로 셀렉터에 등록한다.3. 이후 서버에 들어오는 요청 타입(셀렉션 키값) 에 따라 코드가 작동한다.

 

셀렉션 키란?

I/O 작업은 시스템 콜이 일어나 운영체제 쪽에서 처리해야 한다. 때문에 기존의 I/O 작업은 운영체제의 작업이 끝날때까지 기다리는 방식이었다. 하지만 비동기로 작동하게 되면, 컴퓨터 메모리는 운영체제에서 작업(I/O)에 집중하다가도 주기적으로 어플리케이션에 돌아가 새로운 요청이 있는지 확인해야 한다. 당연히 컨텍스트 스위칭이 자주 일이나기 때문에 비효율적이다.

 

이를 해결하기 위해 셀렉션 키가 존재한다.

 

앞서 말했듯 셀렉션 키는 주요한 특정 이벤트를 의미한다. 그렇기에 처리되지 않은 셀렉션 키가 존재할때, 즉, 처리되지 않은 요청이 존재할때만 어플리케이션에 돌아가면 되는 것이다. 비유하자면 식당에서 주방 안에서 음식을 만들던 직원들이 주문벨이 울릴때만 주문을 받으러 가는 방식이라고 생각하면 된다.

 

셀렉터를 이용해 채팅을 구현하는 코드는 다음과 같다.

 

서버


public class NonBlockingTest {

    public static void main(String[] args) throws IOException, InterruptedException {

        serverStart();

    }

    public static void serverStart() throws IOException {

        ServerSocketChannel server = ServerSocketChannel.open(); // 소켓 서버 생성
        server.configureBlocking(false); // 논블로킹 설정
        server.bind(new InetSocketAddress(9090)); // 9090 포트

        Selector selector = Selector.open();
        server.register(selector, SelectionKey.OP_ACCEPT);
        // 서버는 항상 accept 상태로 연결을 대기하고 있어야 함

        while (true) { //연결이 들어올 때까지 대기

            selector.select();// 처리할 채널이 있을 때까지 블로킹(스레드 정지) 됨

            Set<SelectionKey> selectedKeys = selector.selectedKeys();
            // selectedKeys 연결된 요청의 상태를 반환함
            Iterator<SelectionKey> keyIterator = selectedKeys.iterator();

            while (keyIterator.hasNext()) {
                SelectionKey key = keyIterator.next();
                keyIterator.remove(); // 해당 요청이 처리되면 지우기

                if (key.isAcceptable()) { // 연결 수락이 가능한 경우.
                    // 서버와 연결되기 클라이언트 채널의 상태는 
                    // 직접 SelectionKey값을 변경해줘도 무조건 연결 요청으로 인식됨
                    System.out.println("연결중!");

                    // ServerSocketChannel : 서버 역할을 하는 동시에 소켓 채널 연결을 위해 사용되는 클래스
                    ServerSocketChannel serverChannel = (ServerSocketChannel) key.channel();
                    SocketChannel clientChannel = serverChannel.accept(); // 소켓 채널 연결

                    clientChannel.configureBlocking(false);
                    clientChannel.register(selector, SelectionKey.OP_READ);
                    // 클라이언트 채널의 요청을 read 하는 상태

                    System.out.println("연결 완료: " + clientChannel.getRemoteAddress());

                } else if (key.isConnectable()) { // 연결을 하고자 하는 경우
                    System.out.println("새로운 연결 가능!");

                } else if (key.isReadable()) { // 연결된 상대를 읽을수 있을때
                    System.out.println("상대 데이터를 읽을수 있음");

                    // 클라이언트로부터 데이터 읽기
                    SocketChannel clientChannel = (SocketChannel) key.channel();
                    ByteBuffer buffer = ByteBuffer.allocate(1024);
                    int bytesRead = clientChannel.read(buffer);

                    if (bytesRead == -1) {
                        System.out.println("데이터 없음");
                        clientChannel.close();
                    } else {
                        buffer.flip();
                        System.out.println("받은 메시지: " + new String(buffer.array(), 0, buffer.limit()));
                        buffer.clear();
                    }

                } else if (key.isWritable()) { // 연결된 상대에게 쓸수 있을때
                    System.out.println("상대에게 데이터를 보낼수 있음");
                }
            }
        }
    }

}

 

SelectableChannel 클래스의 register() 함수를 이용해 셀렉터를 등록할수 있다. 첫번째 매개변수는 해당 채널을 관리할 셀렉터, 두번째 매개변수는 해당 채널의 이벤트 상태(키값) 이다. 세번째로 정보를 담은 객체 매개변수를 넣을수도 있다. 이경우 오로지 정보를 보관하기 위한 용도의 객체가 된다.

select() 함수는 새로운 이벤트가 발생할때까지 쓰레드를 블록시킨다. 그러다 새로운 이벤트가 발생하면 그때 쓰레드가 다시 활성화되는 방식이다.

 

 

클라이언트

public class NonBlockingClient {

    public static void main(String[] args) throws IOException {

        sendMessage("This is client1 test", 9090);// 클라이언트 메시지 발송
        sendMessage("222: This is client2 test", 9090);// 클라이언트 메시지 발송2
        
    }

    public static void sendMessage(String message, int port) throws IOException {
        SocketChannel socketChannel = SocketChannel.open();
        socketChannel.configureBlocking(false);
        socketChannel.connect(new InetSocketAddress("localhost", port));

        while (!socketChannel.finishConnect()) { // 연결 완료될 때까지 기다립니다.
            System.out.println("연결중");
        }

        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());

        while (buffer.hasRemaining()) {
            socketChannel.write(buffer);
        }
        socketChannel.close();
    }
}

 

 

이상으로 NIO 에 대한 설명을 마친다.