ProgramingTip

Java에서 프로토콜 기능에 해당하는 C ++이 있습니까?

bestdevel 2020. 11. 27. 21:05
반응형

Java에서 프로토콜 기능에 해당하는 C ++이 있습니까?


C ++ 및 Java 모두에서 파일에서 여러 프로토콜 버퍼 메시지를 읽거나 쓰려고합니다. Google은 메시지 길이 접두사를 쓰는 것을 제안하지만 기본적으로 그렇게 할 수있는 방법은 없습니다 (내가 볼 수 있음).

그러나 버전 2.1.0의 Java API는 분명히 해당 작업을 수행하는 "구분 된"I / O 함수 집합을 선언합니다.

parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo

C ++에 존재하는 것이 있습니까? 어떤 경우에는 Java API가 첨부하는 크기 접두사에 대한 연결 형식은 무엇입니까? 그러면 C ++에서 해당 메시지를 구문 분석 할 수 있습니까?


최신 정보 :

이제 v3.3.0부터 존재합니다 .google/protobuf/util/delimited_message_util.h


나는 여기 파티에 조금 늦었지만 아래 구현 에는 다른 답변에서 누락 된 일부 최적화가 포함되어 있으며 64메가바이트의 입력 후에도 실패하지 않을 것입니다 ( 전체 스트림이 아닌 각 개별 메시지에 64메가바이트 제한여전히 적용하지만 ).

(저는 C ++ 및 Java protobuf 라이브러리의 작성자이지만 더 이상 Google에서 일하지 않습니다.이 코드가 공식 라이브러리에 발생하지 않았거나 갑자기 발생하는 점에 대해 이렇게 생겼습니다.)

bool writeDelimitedTo(
    const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
  // We create a new coded stream for each message.  Don't worry, this is fast.
  google::protobuf::io::CodedOutputStream output(rawOutput);

  // Write the size.
  const int size = message.ByteSize();
  output.WriteVarint32(size);

  uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
  if (buffer != NULL) {
    // Optimization:  The message fits in one buffer, so use the faster
    // direct-to-array serialization path.
    message.SerializeWithCachedSizesToArray(buffer);
  } else {
    // Slightly-slower path when the message is multiple buffers.
    message.SerializeWithCachedSizes(&output);
    if (output.HadError()) return false;
  }

  return true;
}

bool readDelimitedFrom(
    google::protobuf::io::ZeroCopyInputStream* rawInput,
    google::protobuf::MessageLite* message) {
  // We create a new coded stream for each message.  Don't worry, this is fast,
  // and it makes sure the 64MB total size limit is imposed per-message rather
  // than on the whole stream.  (See the CodedInputStream interface for more
  // info on this limit.)
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  uint32_t size;
  if (!input.ReadVarint32(&size)) return false;

  // Tell the stream not to read beyond that size.
  google::protobuf::io::CodedInputStream::Limit limit =
      input.PushLimit(size);

  // Parse the message.
  if (!message->MergeFromCodedStream(&input)) return false;
  if (!input.ConsumedEntireMessage()) return false;

  // Release the limit.
  input.PopLimit(limit);

  return true;
}

좋아, 그래서 나는 필요한 것을 구현하는 C ++ 함수를 내장 수지 만, Java API 참조를 내가 포함하지 않는 내부에서 다음과 같이 MessageLite 인터페이스를 구현 합니다.

void writeDelimitedTo(OutputStream output)
/*  Like writeTo(OutputStream), but writes the size of 
    the message as a varint before writing the data.   */

따라서 Java 크기 접두사는 (Protocol Buffers) varint입니다!

그 정보로 무장 한 저는 C ++ API를 보고 다음과 같은 CodedStream 헤더를 찾았 습니다 .

bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)

이를 사용하여 작업을 수행하는 자체 C ++ 함수를 롤링 할 수 있어야합니다.

그래도 주 메시지 API에 추가해야합니다. Marc Gravell의 우수한 protobuf-net C # 포트 (SerializeWithLengthPrefix 및 DeserializeWithLengthPrefix를 통해)도 마찬가지입니다.


CodedOutputStream / ArrayOutputStream을 사용하여 메시지 (크기 포함)를 작성하고 CodedInputStream / ArrayInputStream을 사용하여 메시지 (크기 포함)를 읽는 모든 문제를 해결했습니다.

예를 들어, 다음 의사 코드는 메시지 다음에 메시지 크기를 씁니다.

const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;

google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);

codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);

글을 쓸 때 버퍼가 메시지에 맞도록 충분히 큰지 확인해야합니다 (크기 포함). 그리고 읽을 때 버퍼에 전체 메시지 (크기 포함)가 포함되어 있는지 확인해야합니다.

Java API에서 제공하는 것과 같은 방법 방법을 C ++ API에 추가하면 확실히 편리 할 것입니다.


여기 있습니다 :

#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>

using namespace google::protobuf::io;

class FASWriter 
{
    std::ofstream mFs;
    OstreamOutputStream *_OstreamOutputStream;
    CodedOutputStream *_CodedOutputStream;
public:
    FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
    {
        assert(mFs.good());

        _OstreamOutputStream = new OstreamOutputStream(&mFs);
        _CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
    }

    inline void operator()(const ::google::protobuf::Message &msg)
    {
        _CodedOutputStream->WriteVarint32(msg.ByteSize());

        if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
            std::cout << "SerializeToCodedStream error " << std::endl;
    }

    ~FASWriter()
    {
        delete _CodedOutputStream;
        delete _OstreamOutputStream;
        mFs.close();
    }
};

class FASReader
{
    std::ifstream mFs;

    IstreamInputStream *_IstreamInputStream;
    CodedInputStream *_CodedInputStream;
public:
    FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
    {
        assert(mFs.good());

        _IstreamInputStream = new IstreamInputStream(&mFs);
        _CodedInputStream = new CodedInputStream(_IstreamInputStream);      
    }

    template<class T>
    bool ReadNext()
    {
        T msg;
        unsigned __int32 size;

        bool ret;
        if ( ret = _CodedInputStream->ReadVarint32(&size) )
        {   
            CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
            if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
            {
                _CodedInputStream->PopLimit(msgLimit);      
                std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
            }
        }

        return ret;
    }

    ~FASReader()
    {
        delete _CodedInputStream;
        delete _IstreamInputStream;
        mFs.close();
    }
};

IsteamInputStream은 std :: istream과 함께 사용할 때 쉽게 발생하는 eofs 및 기타 오류에 매우 취약합니다. 이 후 protobuf 스트림은 영구적으로 손상되고 이미 사용 된 버퍼는 모두 파괴되었습니다. protobuf의 기존 스트림에서 읽기에 대한 지원이 있습니다.

CopyingInputStreamAdaptergoogle::protobuf::io::CopyingInputStream 와 함께 구현 하고 사용 하십시오 . 출력 변형에 동일하게 수행하십시오.

실제로 구문 분석 호출 google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)은 버퍼가 위치 에서 끝납니다 . 남은 것은 어떻게 든 그것을 읽는 것입니다.

다음은 Asio 동기화 스트림 ( SyncReadStream / SyncWriteStream ) 과 함께 사용하는 예입니다 .

#include <google/protobuf/io/zero_copy_stream_impl_lite.h>

using namespace google::protobuf::io;


template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
    public:
        AsioInputStream(SyncReadStream& sock);
        int Read(void* buffer, int size);
    private:
        SyncReadStream& m_Socket;
};


template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
    m_Socket(sock) {}


template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
    std::size_t bytes_read;
    boost::system::error_code ec;
    bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);

    if(!ec) {
        return bytes_read;
    } else if (ec == boost::asio::error::eof) {
        return 0;
    } else {
        return -1;
    }
}


template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
    public:
        AsioOutputStream(SyncWriteStream& sock);
        bool Write(const void* buffer, int size);
    private:
        SyncWriteStream& m_Socket;
};


template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
    m_Socket(sock) {}


template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{   
    boost::system::error_code ec;
    m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
    return !ec;
}

용법 :

AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);

Message protoMessage;
uint32_t msg_size;

/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
    // Handle error
 }

/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
    // Handle error
}

/* Remove limit */
cis.PopLimit(msg_limit);

C ++와 Python에서 동일한 문제가 발생했습니다.

C ++ 버전의 경우 Kenton Varda 가이 버전에 게시 한 코드와 그가 protobuf 팀에 전송 된 풀 요청의 코드를 혼합하여 사용했습니다 (여기에 게시 된 버전은 EOF를 처리하지 않는 반면 github에 보낸 버전은 처리하지 않습니다 때문입니다). ).

#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>


bool writeDelimitedTo(const google::protobuf::MessageLite& message,
    google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
    // We create a new coded stream for each message.  Don't worry, this is fast.
    google::protobuf::io::CodedOutputStream output(rawOutput);

    // Write the size.
    const int size = message.ByteSize();
    output.WriteVarint32(size);

    uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
    if (buffer != NULL)
    {
        // Optimization:  The message fits in one buffer, so use the faster
        // direct-to-array serialization path.
        message.SerializeWithCachedSizesToArray(buffer);
    }

    else
    {
        // Slightly-slower path when the message is multiple buffers.
        message.SerializeWithCachedSizes(&output);
        if (output.HadError())
            return false;
    }

    return true;
}

bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
    // We create a new coded stream for each message.  Don't worry, this is fast,
    // and it makes sure the 64MB total size limit is imposed per-message rather
    // than on the whole stream.  (See the CodedInputStream interface for more
    // info on this limit.)
    google::protobuf::io::CodedInputStream input(rawInput);
    const int start = input.CurrentPosition();
    if (clean_eof)
        *clean_eof = false;


    // Read the size.
    uint32_t size;
    if (!input.ReadVarint32(&size))
    {
        if (clean_eof)
            *clean_eof = input.CurrentPosition() == start;
        return false;
    }
    // Tell the stream not to read beyond that size.
    google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);

    // Parse the message.
    if (!message->MergeFromCodedStream(&input)) return false;
    if (!input.ConsumedEntireMessage()) return false;

    // Release the limit.
    input.PopLimit(limit);

    return true;
}

그리고 여기 내 python2 구현이 있습니다.

from google.protobuf.internal import encoder
from google.protobuf.internal import decoder

#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
    mask = 0x80 # (1 << 7)
    raw_varint32 = []
    while 1:
        b = stream.read(1)
        #eof
        if b == "":
            break
        raw_varint32.append(b)
        if not (ord(b) & mask):
            #we found a byte starting with a 0, which means it's the last byte of this varint
            break
    return raw_varint32

def writeDelimitedTo(message, stream):
    message_str = message.SerializeToString()
    delimiter = encoder._VarintBytes(len(message_str))
    stream.write(delimiter + message_str)

def readDelimitedFrom(MessageType, stream):
    raw_varint32 = readRawVarint32(stream)
    message = None

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message = MessageType()
        message.ParseFromString(data)

    return message

#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version 
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
    raw_varint32 = readRawVarint32(stream)

    if raw_varint32:
        size, _ = decoder._DecodeVarint32(raw_varint32, 0)

        data = stream.read(size)
        if len(data) < size:
            raise Exception("Unexpected end of file")

        message.ParseFromString(data)

        return message
    else:
        return None

그것은 가장 잘 보이는 코드가 아닐 수도 있고 상당히 리팩토링 될 수 확신하지만 그렇게하는 한 가지 방법을 보여줄 것입니다.

이제 큰 문제는 SLOW 입니다.

python-protobuf의 C ++ 구현을 사용하더라도 순수한 C ++보다 훨씬 느립니다. 파일에서 각각 ~ 30 바이트의 10M protobuf 메시지를 읽는 벤치 마크가 있습니다. C ++에서는 0.9 초, 파이썬에서는 35 초가 걸립니다.

좀 더 빠르게 만드는 한 가지 방법은 varint 디코더를 다시 구현하여 파일에서 읽고 한 번에 디코딩하도록 만드는 것입니다. 파일에서 읽은 다음이 코드가 현재 수행하는 것처럼 디코딩하는 대신입니다. (프로파일 링은 varint 인코더 / 디코더에서 상당한 시간이 소요됨을 보여줍니다). 그러나 말할 필요도없이 파이썬 버전과 C ++ 버전 사이의 격차를 좁히기에 충분하지 않습니다.

더 빠르게 만드는 아이디어는 매우 환영합니다. :)


이것에 대한 해결책도 찾고있었습니다. 다음은 일부 Java 코드 writeDelimitedTo가 파일에 많은 MyRecord 메시지를 작성했다고 가정하는 솔루션의 핵심입니다 . 다음을 수행하여 파일 및 루프를 엽니 다.

if (someCodedInputStream-> ReadVarint32 (& bytes)) {
  CodedInputStream :: Limit msgLimit = someCodedInputStream-> PushLimit (bytes);
  if (myRecord-> ParseFromCodedStream (someCodedInputStream)) {
    // 파싱 된 MyRecord 인스턴스로 작업 수행
  } else {
    // 파싱 오류 처리
  }
  someCodedInputStream-> PopLimit (msgLimit);
} else {
  // 파일의 끝일 수 있음
}

도움이 되었기를 바랍니다.


Objective-c 버전의 프로토콜 버퍼로 작업하면서이 정확한 문제가 발생했습니다. iOS 클라이언트에서 첫 번째 바이트로 길이를 예상하는 parseDelimitedFrom을 사용하는 Java 기반 서버로 전송할 때 먼저 CodedOutputStream에 writeRawByte를 호출해야했습니다. 이 문제를 겪는 다른 사람들을 돕기 위해 여기에 게시하십시오. 이 문제를 해결하는 동안 Google proto-bufs가이 작업을 수행하는 단순한 플래그와 함께 제공 될 것이라고 생각할 것입니다.

    Request* request = [rBuild build];

    [self sendMessage:request];
} 


- (void) sendMessage:(Request *) request {

    //** get length
    NSData* n = [request data];
    uint8_t len = [n length];

    PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
    //** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
    [os writeRawByte:len];
    [request writeToCodedOutputStream:os];
    [os flush];
}

위의 Kenton Varda의 답변에 대한 주석으로 이것을 쓸 수 없기 때문에; 나는 그가 게시 한 코드 (및 제공된 다른 답변)에 버그가 있다고 생각합니다. 다음 코드 :

...
google::protobuf::io::CodedInputStream input(rawInput);

// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...

입력에서 이미 읽은 varint32의 크기를 고려하지 않기 때문에 잘못된 제한을 설정합니다. 이로 인해 다음 메시지의 일부가 될 수있는 스트림에서 추가 바이트를 읽을 때 데이터 손실 / 손상이 발생할 수 있습니다. 이를 올바르게 처리하는 일반적인 방법은 크기를 읽는 데 사용되는 CodedInputStream을 삭제하고 페이로드를 읽기위한 새 파일을 만드는 것입니다.

...
uint32_t size;
{
  google::protobuf::io::CodedInputStream input(rawInput);

  // Read the size.
  if (!input.ReadVarint32(&size)) return false;
}

google::protobuf::io::CodedInputStream input(rawInput);

// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
    input.PushLimit(size);
...

지정된 구분 기호를 사용하여 스트림에서 문자열을 읽는 데 getline을 사용할 수 있습니다.

istream& getline ( istream& is, string& str, char delim );

(헤더에 정의 됨)

참고URL : https://stackoverflow.com/questions/2340730/are-there-c-equivalents-for-the-protocol-buffers-delimited-i-o-functions-in-ja

반응형