본문 바로가기

네트워크/예제

[Network] - Java로 구현한 소켓 통신 서버(2)

https://teachingforme.tistory.com/25

 

[Network] - Java로 구현한 소켓 통신 서버(1)

졸업 작품에서 모바일(안드로이드)과 메인 스테이션(데스크탑 컴퓨터)간의 통신을 구현해야해서 자바로 서버 코드를 작성하게 되었다. 그래서 한번 처음부터 끝까지 개발 일지 형식으로 글로

teachingforme.tistory.com

 

 

 저번에 작성하였던 서버 코드를 대거 수정하였다. 지난 번에 작성하였던 서버 코드는 blocking synchronous 방식으로 짜여진 코드로, 클라이언트가 접속할 때마다 쓰레드를 하나씩 늘리는 방식이다.

 

 

 

 구현은 쉽지만 클라이언트가 접속할 때마다 쓰레드를 하나씩 늘린다는건 정말 비효율적이고 한계가 뚜렷한 방식이기 때문에 아쉬움이 많이 남았고 이번에 시간을 좀 들여서 코드를 전부 갈아엎었다.

 

 

 

 

 자바의 경우 c계열 언어들처럼 iocp 방식으로 서버를 구성하는 것이 불가능 하다고 생각을 하여서 처음에는 위에서 언급했던 대로 구현하였으나 조금더 찾아보니 iocp와 이름만 다를 뿐, 동작방식이 같은 api가 이미 존재하였고 해당 api를 이용하여 서버 코드를 재구성하였다.

 

package ServerCore;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.LinkedList;
import java.util.Queue;

public abstract class Session {

	public AsynchronousSocketChannel _channel;
	
	private ByteBuffer _recvBuffer;
	private ByteBuffer _sendBuffer;
	
	public abstract void OnConnected();
	public abstract void OnRecvPacket(byte[] data);
	public abstract void OnDisconnected();
	
	private Queue<byte[]> _sendingQueue = new LinkedList<byte[]>();
	
	public void OnRecv(byte[] recvBuffer) 
	{
		OnRecvPacket(recvBuffer);	
		
	}
	
	public void OnSend(byte[] sendBuffer)
	{ 
		if(sendBuffer[1] == Define.PacketId.Disc.getValue()) // 연결종료 패킷 전송 시 채널 close
    		{
    			Disconnect();
    		}
	}
		
	public void Init(AsynchronousSocketChannel channel)
	{
		this._channel = channel;
		this._recvBuffer = ByteBuffer.allocate(1024);

		RegisterRecv();
	}
	
	
	
	
	// Session 통신 영역
	// receive 버퍼는 추후 고려 TODO...
	void RegisterRecv() 
	{
		_recvBuffer.clear();
		_channel.read(_recvBuffer, null, RecvHandler());
	}
	
	
	// async read completed 
	private CompletionHandler<Integer, Void> RecvHandler() {
        return new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) {
            	
            	if(result.intValue() > 0 && result.intValue() >= _recvBuffer.array()[0]) // 파싱 가능한지 확인
	     	{
	            try 
	            {
	                System.out.println(
	        					"Log - " + "[" + Define.LogId.Recv.getValue() + "]" + 
	        					" - PacketID : " + _recvBuffer.array()[1] + " from " + _channel.getRemoteAddress());
	                	
	                	OnRecv( _recvBuffer.array());
	            }
	            catch(Exception e) 
	            {
	                e.printStackTrace();
	            }
	                
	            RegisterRecv(); // 비동기 호출 재등록
        	}
            }
            
            @Override
            public void failed(Throwable exc, Void attachment) { }
        };
    }
	
	public synchronized void Send(byte[] data) 
	{
		
		if(_sendingQueue.isEmpty()) { // 보낼 패킷들이 pending되지 않은 경우
			_sendingQueue.add(data);
			RegisterSend();
		}
		else	  					  // 이미 비동기 작업 실행중인 경우
			_sendingQueue.add(data);
	}
	
	void RegisterSend() 
	{
		this._sendBuffer = ByteBuffer.wrap(_sendingQueue.peek());
		_channel.write(_sendBuffer, null, SendHandler());	
	}
	

    private CompletionHandler<Integer, Void> SendHandler() {
        return new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) 
            {
            	
            	try 
                {
            		System.out.println(
        					"Log - " + "[" + Define.LogId.Send.getValue() + "]" + 
        					" - PacketID : " + _sendBuffer.array()[1] + " to " + _channel.getRemoteAddress());
            		
            		
            		_sendingQueue.poll();
                	OnSend(_sendBuffer.array());
                	if(!_sendingQueue.isEmpty()) // 보낼 패킷이 밀린경우 재호출
                		RegisterSend();
                	
                	
                }
                catch(Exception e) 
                {
                    e.printStackTrace();
                }
                
            }
            @Override
            public void failed(Throwable exc, Void attachment) { }
        };
    }
    
    public void Disconnect() 
    {
    	try 
    	{
		_channel.close();
	} 
    	catch (IOException e) 
    	{
		e.printStackTrace();
	}
    	
    	OnDisconnected();
    	
    }

}

 

 

 이전에 서버 코드를 작성할 때 사용하였던 기본 Socket과 달리 AsynchronousSocketChannel을 사용한다. 말 그대로 비동기 처리를 위한 api이다. 기본적인 함수들은 유지한채 iocp방식으로 동작하기 위한 클래스들을 구현하였다. 특히 비동기 처리를 위해서는 CompletionHandler라는 클래스를 구현하여야 한다. 자바는 함수를 인자로 넣어줄 수 없기 때문에 따로 구현해야하는 클래스가 존재하며, read write accept connect 동작에 상관없이 동일한 클래스를 구현하고 완료 시 작업과 실패 시 작업을 작성해주면 된다.

 

 

 

 한 가지 의문이 드는 것은 c계열 언어에서는 channel이라는 키워드를 본적이 없는데 자바로 네트워크 관련 api를 찾아보면 channel이라는 키워드가 자주 나온다. read write 등의 송수신 함수들을 호출한다는 점에서 소켓과 동일하나 채널이라고 부르는게 의문이 들기는 하나 언어간의 차이를 두기위해 이름을 달리하였겠거니 조심스레 추측을 해본다.

 

 

 

 애초에 AsynchronousSocketChannel을 이용한 비동기 처리 서버는 iocp와 다를게 없다. 그도 그럴게 read나 write 함수 관련해서 오류가 나면 자바 api 안에 iocp 라는 클래스가 있는것을 확인할 수 있었다.

 

 

 

 사용방식이 완전히 같긴 하지만 iocp자체는 MS에서 제공하는 api고 윈도우에서만 돌릴 수 있다는 특징이 있기에 애초에 JVM으로 돌아가는 자바에서의 iocp는 사용방식이 같을 뿐 내부에서 돌아가는 방식은 완전히 다를 것이다.

 

 

 

 

 

 

public synchronized void Send(byte[] data) 
{
	if(_sendingQueue.isEmpty()) { // 보낼 패킷들이 pending되지 않은 경우
		_sendingQueue.add(data);
		RegisterSend();
	}
	else // 이미 비동기 작업 실행중인 경우
		_sendingQueue.add(data);
}

 

 

 비동기로 처리하게 되면 기본적으로 멀티쓰레딩인 점을 감안하고 코드를 작성하여야 한다. 그렇지 않으면 오만가지 에러가 발생하게 되는데 예를 들면 위와 같은 Send 함수이다. 하나의 Session에서 한번에 여러개의 쓰레드가 들러붙어 메시지를 보내고 싶은 상황이 있을 수 있다. 근데 문제는 Send를 여러번 호출하게 되면 예외가 발생한다.

 

 

 따라서 해결책을 찾았고 위의 코드처럼 Send 함수를 저린식으로 구성한 이유는 다음과 같다.

 

 

 

java.nio.channels.writependingexception - 이전의 쓰기 처리가 완료되기 전에 어떤 쓰레드가 쓰기 처리를 시작                                                                            하려 할 때 발생

 

 

 

 위의 예외가 지속적으로 발생하였고 해결을 위해 예외의 원인이 위와같음을 찾았다.

 

 

 그래서 synchronized 키워드를 이용해서 전송 호출 함수는 쓰레드당 하나만 접근하게 하였다. 그런데도 불구하고 에러가 높은 확률로 발생하였다. 사실 synchronized 키워드로 하나의 쓰레드만 접근시키게 한다고 해도 에러는 해결이 될 수가 없다. 예를 들어 전송 요청 작업을 완료하고 completed 메소드가 실행중인 도중에 Send를 호출하여 쓰레드가 접근하게 되면 또 다시 중복으로 호출이 되여 예외가 발생한다.

 

 

 

 그럼 이것을 어떻게 해결하는가? 위의 코드처럼 SendingQueue라는 큐를 이용하여 다른 쓰레드가 이미 전송작업을 진행중인 경우 전송함수를 호출하지 않고 큐에 보내고자 하는 패킷을 넣은 후 그냥 지나가게 끔 하였다. 이렇게 하게 되면 전송 작업이 완전히 완료되어 쓰레드가 회수되기 전까지 다른 쓰레드는 전송 함수를 호출할 수 없게 된다.

 

 

 

 

private CompletionHandler<Integer, Void> SendHandler() {
        return new CompletionHandler<Integer, Void>() {
            @Override
            public void completed(Integer result, Void attachment) 
            {
            	
            	try 
                {
            		System.out.println(
        					"Log - " + "[" + Define.LogId.Send.getValue() + "]" + 
        					" - PacketID : " + _sendBuffer.array()[1] + " to " + _channel.getRemoteAddress());
            		
            		
            		_sendingQueue.poll();
                	OnSend(_sendBuffer.array());
                	if(!_sendingQueue.isEmpty()) // 보낼 패킷이 밀린경우 재호출
                		RegisterSend();
                	
                	
                }
                catch(Exception e) 
                {
                    e.printStackTrace();
                }
                
            }
            @Override
            public void failed(Throwable exc, Void attachment) { }
        };
    }

 

 

 

 위의 코드는 전송 완료시 호출되는 함수를 구현한 클래스이다. 주목할 것은 completed 내부의 내용이다. 로그를 Print하고 전송 완료 작업을 진행한 후 작업을 나가기 전 SendingQueue를 확인하여 자신이 전송 완료 작업을 진행하던 도중에 다른 전송할 패킷을 SendingQueue에 넣었는 지 확인하고 만약 있다면 다시 Send(비동기 등록)함수를 호출한다.

 

 

 

 이렇듯 쉽게 쉽게 구현하였던 이전 코드에 비해 멀티쓰레딩을 위해 고려해야할 점이 매우 많았다. 따라서 버그를 잡기위해 많은 시간이 들었으나 성능에는 유의미한 변화가 생겼다.

 

 

 

 

 

 

 

 완성된 서버에 20명의 dummy client가 동시에 접속한 후 접속을 해제 하게끔 만들어 보았다. 모두 정상적으로 접속한 후 접속 종료 되었다는 로그를 찍고 있다. 이전 서버였다면 한번에 20개의 쓰레드가 필요하였다는 것인데 context switching 비용때문에 이런식으로 깔끔하게 처리 되지 않았을 것이다. 처리 되는게 문제가 아니라 예외가 찍혔을 것이다.

 

 

 

 그래도 개강 전에 서버의 기본 틀은 갖춰진것 같아 마음에 든다. 개강 후에는 필요한 패킷 디자인을 추가한다던가 좌표 연산 코드를 짠다던가 자질구레한 작업들을 하게 될거 같은데 그것이 지금까지 하였던 일에 비해 쉽길 바란다.