Java에서 프로토콜 기능에 해당하는 C ++이 있습니까?
C ++ 및 Java 모두에서 파일에서 여러 프로토콜 버퍼 메시지를 읽거나 쓰려고합니다. Google은 메시지 길이 접두사를 쓰는 것을 제안하지만 기본적으로 그렇게 할 수있는 방법은 없습니다 (내가 볼 수 있음).
그러나 버전 2.1.0의 Java API는 분명히 해당 작업을 수행하는 "구분 된"I / O 함수 집합을 선언합니다.
parseDelimitedFrom
mergeDelimitedFrom
writeDelimitedTo
C ++에 존재하는 것이 있습니까? 어떤 경우에는 Java API가 첨부하는 크기 접두사에 대한 연결 형식은 무엇입니까? 그러면 C ++에서 해당 메시지를 구문 분석 할 수 있습니까?
최신 정보 :
이제 v3.3.0부터 존재합니다 .google/protobuf/util/delimited_message_util.h
나는 여기 파티에 조금 늦었지만 아래 구현 에는 다른 답변에서 누락 된 일부 최적화가 포함되어 있으며 64메가바이트의 입력 후에도 실패하지 않을 것입니다 ( 전체 스트림이 아닌 각 개별 메시지에 64메가바이트 제한 을 여전히 적용하지만 ).
(저는 C ++ 및 Java protobuf 라이브러리의 작성자이지만 더 이상 Google에서 일하지 않습니다.이 코드가 공식 라이브러리에 발생하지 않았거나 갑자기 발생하는 점에 대해 이렇게 생겼습니다.)
bool writeDelimitedTo(
const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput) {
// We create a new coded stream for each message. Don't worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL) {
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
} else {
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError()) return false;
}
return true;
}
bool readDelimitedFrom(
google::protobuf::io::ZeroCopyInputStream* rawInput,
google::protobuf::MessageLite* message) {
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
좋아, 그래서 나는 필요한 것을 구현하는 C ++ 함수를 내장 수지 만, Java API 참조를 내가 포함하지 않는 내부에서 다음과 같이 MessageLite 인터페이스를 구현 합니다.
void writeDelimitedTo(OutputStream output)
/* Like writeTo(OutputStream), but writes the size of
the message as a varint before writing the data. */
따라서 Java 크기 접두사는 (Protocol Buffers) varint입니다!
그 정보로 무장 한 저는 C ++ API를 보고 다음과 같은 CodedStream 헤더를 찾았 습니다 .
bool CodedInputStream::ReadVarint32(uint32 * value)
void CodedOutputStream::WriteVarint32(uint32 value)
이를 사용하여 작업을 수행하는 자체 C ++ 함수를 롤링 할 수 있어야합니다.
그래도 주 메시지 API에 추가해야합니다. Marc Gravell의 우수한 protobuf-net C # 포트 (SerializeWithLengthPrefix 및 DeserializeWithLengthPrefix를 통해)도 마찬가지입니다.
CodedOutputStream / ArrayOutputStream을 사용하여 메시지 (크기 포함)를 작성하고 CodedInputStream / ArrayInputStream을 사용하여 메시지 (크기 포함)를 읽는 모든 문제를 해결했습니다.
예를 들어, 다음 의사 코드는 메시지 다음에 메시지 크기를 씁니다.
const unsigned bufLength = 256;
unsigned char buffer[bufLength];
Message protoMessage;
google::protobuf::io::ArrayOutputStream arrayOutput(buffer, bufLength);
google::protobuf::io::CodedOutputStream codedOutput(&arrayOutput);
codedOutput.WriteLittleEndian32(protoMessage.ByteSize());
protoMessage.SerializeToCodedStream(&codedOutput);
글을 쓸 때 버퍼가 메시지에 맞도록 충분히 큰지 확인해야합니다 (크기 포함). 그리고 읽을 때 버퍼에 전체 메시지 (크기 포함)가 포함되어 있는지 확인해야합니다.
Java API에서 제공하는 것과 같은 방법 방법을 C ++ API에 추가하면 확실히 편리 할 것입니다.
여기 있습니다 :
#include <google/protobuf/io/zero_copy_stream_impl.h>
#include <google/protobuf/io/coded_stream.h>
using namespace google::protobuf::io;
class FASWriter
{
std::ofstream mFs;
OstreamOutputStream *_OstreamOutputStream;
CodedOutputStream *_CodedOutputStream;
public:
FASWriter(const std::string &file) : mFs(file,std::ios::out | std::ios::binary)
{
assert(mFs.good());
_OstreamOutputStream = new OstreamOutputStream(&mFs);
_CodedOutputStream = new CodedOutputStream(_OstreamOutputStream);
}
inline void operator()(const ::google::protobuf::Message &msg)
{
_CodedOutputStream->WriteVarint32(msg.ByteSize());
if ( !msg.SerializeToCodedStream(_CodedOutputStream) )
std::cout << "SerializeToCodedStream error " << std::endl;
}
~FASWriter()
{
delete _CodedOutputStream;
delete _OstreamOutputStream;
mFs.close();
}
};
class FASReader
{
std::ifstream mFs;
IstreamInputStream *_IstreamInputStream;
CodedInputStream *_CodedInputStream;
public:
FASReader(const std::string &file), mFs(file,std::ios::in | std::ios::binary)
{
assert(mFs.good());
_IstreamInputStream = new IstreamInputStream(&mFs);
_CodedInputStream = new CodedInputStream(_IstreamInputStream);
}
template<class T>
bool ReadNext()
{
T msg;
unsigned __int32 size;
bool ret;
if ( ret = _CodedInputStream->ReadVarint32(&size) )
{
CodedInputStream::Limit msgLimit = _CodedInputStream->PushLimit(size);
if ( ret = msg.ParseFromCodedStream(_CodedInputStream) )
{
_CodedInputStream->PopLimit(msgLimit);
std::cout << mFeed << " FASReader ReadNext: " << msg.DebugString() << std::endl;
}
}
return ret;
}
~FASReader()
{
delete _CodedInputStream;
delete _IstreamInputStream;
mFs.close();
}
};
IsteamInputStream은 std :: istream과 함께 사용할 때 쉽게 발생하는 eofs 및 기타 오류에 매우 취약합니다. 이 후 protobuf 스트림은 영구적으로 손상되고 이미 사용 된 버퍼는 모두 파괴되었습니다. protobuf의 기존 스트림에서 읽기에 대한 지원이 있습니다.
CopyingInputStreamAdaptergoogle::protobuf::io::CopyingInputStream
와 함께 구현 하고 사용 하십시오 . 출력 변형에 동일하게 수행하십시오.
실제로 구문 분석 호출 google::protobuf::io::CopyingInputStream::Read(void* buffer, int size)
은 버퍼가 위치 에서 끝납니다 . 남은 것은 어떻게 든 그것을 읽는 것입니다.
다음은 Asio 동기화 스트림 ( SyncReadStream / SyncWriteStream ) 과 함께 사용하는 예입니다 .
#include <google/protobuf/io/zero_copy_stream_impl_lite.h>
using namespace google::protobuf::io;
template <typename SyncReadStream>
class AsioInputStream : public CopyingInputStream {
public:
AsioInputStream(SyncReadStream& sock);
int Read(void* buffer, int size);
private:
SyncReadStream& m_Socket;
};
template <typename SyncReadStream>
AsioInputStream<SyncReadStream>::AsioInputStream(SyncReadStream& sock) :
m_Socket(sock) {}
template <typename SyncReadStream>
int
AsioInputStream<SyncReadStream>::Read(void* buffer, int size)
{
std::size_t bytes_read;
boost::system::error_code ec;
bytes_read = m_Socket.read_some(boost::asio::buffer(buffer, size), ec);
if(!ec) {
return bytes_read;
} else if (ec == boost::asio::error::eof) {
return 0;
} else {
return -1;
}
}
template <typename SyncWriteStream>
class AsioOutputStream : public CopyingOutputStream {
public:
AsioOutputStream(SyncWriteStream& sock);
bool Write(const void* buffer, int size);
private:
SyncWriteStream& m_Socket;
};
template <typename SyncWriteStream>
AsioOutputStream<SyncWriteStream>::AsioOutputStream(SyncWriteStream& sock) :
m_Socket(sock) {}
template <typename SyncWriteStream>
bool
AsioOutputStream<SyncWriteStream>::Write(const void* buffer, int size)
{
boost::system::error_code ec;
m_Socket.write_some(boost::asio::buffer(buffer, size), ec);
return !ec;
}
용법 :
AsioInputStream<boost::asio::ip::tcp::socket> ais(m_Socket); // Where m_Socket is a instance of boost::asio::ip::tcp::socket
CopyingInputStreamAdaptor cis_adp(&ais);
CodedInputStream cis(&cis_adp);
Message protoMessage;
uint32_t msg_size;
/* Read message size */
if(!cis.ReadVarint32(&msg_size)) {
// Handle error
}
/* Make sure not to read beyond limit of message */
CodedInputStream::Limit msg_limit = cis.PushLimit(msg_size);
if(!msg.ParseFromCodedStream(&cis)) {
// Handle error
}
/* Remove limit */
cis.PopLimit(msg_limit);
C ++와 Python에서 동일한 문제가 발생했습니다.
C ++ 버전의 경우 Kenton Varda 가이 버전에 게시 한 코드와 그가 protobuf 팀에 전송 된 풀 요청의 코드를 혼합하여 사용했습니다 (여기에 게시 된 버전은 EOF를 처리하지 않는 반면 github에 보낸 버전은 처리하지 않습니다 때문입니다). ).
#include <google/protobuf/message_lite.h>
#include <google/protobuf/io/zero_copy_stream.h>
#include <google/protobuf/io/coded_stream.h>
bool writeDelimitedTo(const google::protobuf::MessageLite& message,
google::protobuf::io::ZeroCopyOutputStream* rawOutput)
{
// We create a new coded stream for each message. Don't worry, this is fast.
google::protobuf::io::CodedOutputStream output(rawOutput);
// Write the size.
const int size = message.ByteSize();
output.WriteVarint32(size);
uint8_t* buffer = output.GetDirectBufferForNBytesAndAdvance(size);
if (buffer != NULL)
{
// Optimization: The message fits in one buffer, so use the faster
// direct-to-array serialization path.
message.SerializeWithCachedSizesToArray(buffer);
}
else
{
// Slightly-slower path when the message is multiple buffers.
message.SerializeWithCachedSizes(&output);
if (output.HadError())
return false;
}
return true;
}
bool readDelimitedFrom(google::protobuf::io::ZeroCopyInputStream* rawInput, google::protobuf::MessageLite* message, bool* clean_eof)
{
// We create a new coded stream for each message. Don't worry, this is fast,
// and it makes sure the 64MB total size limit is imposed per-message rather
// than on the whole stream. (See the CodedInputStream interface for more
// info on this limit.)
google::protobuf::io::CodedInputStream input(rawInput);
const int start = input.CurrentPosition();
if (clean_eof)
*clean_eof = false;
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size))
{
if (clean_eof)
*clean_eof = input.CurrentPosition() == start;
return false;
}
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit = input.PushLimit(size);
// Parse the message.
if (!message->MergeFromCodedStream(&input)) return false;
if (!input.ConsumedEntireMessage()) return false;
// Release the limit.
input.PopLimit(limit);
return true;
}
그리고 여기 내 python2 구현이 있습니다.
from google.protobuf.internal import encoder
from google.protobuf.internal import decoder
#I had to implement this because the tools in google.protobuf.internal.decoder
#read from a buffer, not from a file-like objcet
def readRawVarint32(stream):
mask = 0x80 # (1 << 7)
raw_varint32 = []
while 1:
b = stream.read(1)
#eof
if b == "":
break
raw_varint32.append(b)
if not (ord(b) & mask):
#we found a byte starting with a 0, which means it's the last byte of this varint
break
return raw_varint32
def writeDelimitedTo(message, stream):
message_str = message.SerializeToString()
delimiter = encoder._VarintBytes(len(message_str))
stream.write(delimiter + message_str)
def readDelimitedFrom(MessageType, stream):
raw_varint32 = readRawVarint32(stream)
message = None
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message = MessageType()
message.ParseFromString(data)
return message
#In place version that takes an already built protobuf object
#In my tests, this is around 20% faster than the other version
#of readDelimitedFrom()
def readDelimitedFrom_inplace(message, stream):
raw_varint32 = readRawVarint32(stream)
if raw_varint32:
size, _ = decoder._DecodeVarint32(raw_varint32, 0)
data = stream.read(size)
if len(data) < size:
raise Exception("Unexpected end of file")
message.ParseFromString(data)
return message
else:
return None
그것은 가장 잘 보이는 코드가 아닐 수도 있고 상당히 리팩토링 될 수 확신하지만 그렇게하는 한 가지 방법을 보여줄 것입니다.
이제 큰 문제는 SLOW 입니다.
python-protobuf의 C ++ 구현을 사용하더라도 순수한 C ++보다 훨씬 느립니다. 파일에서 각각 ~ 30 바이트의 10M protobuf 메시지를 읽는 벤치 마크가 있습니다. C ++에서는 0.9 초, 파이썬에서는 35 초가 걸립니다.
좀 더 빠르게 만드는 한 가지 방법은 varint 디코더를 다시 구현하여 파일에서 읽고 한 번에 디코딩하도록 만드는 것입니다. 파일에서 읽은 다음이 코드가 현재 수행하는 것처럼 디코딩하는 대신입니다. (프로파일 링은 varint 인코더 / 디코더에서 상당한 시간이 소요됨을 보여줍니다). 그러나 말할 필요도없이 파이썬 버전과 C ++ 버전 사이의 격차를 좁히기에 충분하지 않습니다.
더 빠르게 만드는 아이디어는 매우 환영합니다. :)
이것에 대한 해결책도 찾고있었습니다. 다음은 일부 Java 코드 writeDelimitedTo
가 파일에 많은 MyRecord 메시지를 작성했다고 가정하는 솔루션의 핵심입니다 . 다음을 수행하여 파일 및 루프를 엽니 다.
if (someCodedInputStream-> ReadVarint32 (& bytes)) { CodedInputStream :: Limit msgLimit = someCodedInputStream-> PushLimit (bytes); if (myRecord-> ParseFromCodedStream (someCodedInputStream)) { // 파싱 된 MyRecord 인스턴스로 작업 수행 } else { // 파싱 오류 처리 } someCodedInputStream-> PopLimit (msgLimit); } else { // 파일의 끝일 수 있음 }
도움이 되었기를 바랍니다.
Objective-c 버전의 프로토콜 버퍼로 작업하면서이 정확한 문제가 발생했습니다. iOS 클라이언트에서 첫 번째 바이트로 길이를 예상하는 parseDelimitedFrom을 사용하는 Java 기반 서버로 전송할 때 먼저 CodedOutputStream에 writeRawByte를 호출해야했습니다. 이 문제를 겪는 다른 사람들을 돕기 위해 여기에 게시하십시오. 이 문제를 해결하는 동안 Google proto-bufs가이 작업을 수행하는 단순한 플래그와 함께 제공 될 것이라고 생각할 것입니다.
Request* request = [rBuild build];
[self sendMessage:request];
}
- (void) sendMessage:(Request *) request {
//** get length
NSData* n = [request data];
uint8_t len = [n length];
PBCodedOutputStream* os = [PBCodedOutputStream streamWithOutputStream:outputStream];
//** prepend it to message, such that Request.parseDelimitedFrom(in) can parse it properly
[os writeRawByte:len];
[request writeToCodedOutputStream:os];
[os flush];
}
위의 Kenton Varda의 답변에 대한 주석으로 이것을 쓸 수 없기 때문에; 나는 그가 게시 한 코드 (및 제공된 다른 답변)에 버그가 있다고 생각합니다. 다음 코드 :
...
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
uint32_t size;
if (!input.ReadVarint32(&size)) return false;
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
...
입력에서 이미 읽은 varint32의 크기를 고려하지 않기 때문에 잘못된 제한을 설정합니다. 이로 인해 다음 메시지의 일부가 될 수있는 스트림에서 추가 바이트를 읽을 때 데이터 손실 / 손상이 발생할 수 있습니다. 이를 올바르게 처리하는 일반적인 방법은 크기를 읽는 데 사용되는 CodedInputStream을 삭제하고 페이로드를 읽기위한 새 파일을 만드는 것입니다.
...
uint32_t size;
{
google::protobuf::io::CodedInputStream input(rawInput);
// Read the size.
if (!input.ReadVarint32(&size)) return false;
}
google::protobuf::io::CodedInputStream input(rawInput);
// Tell the stream not to read beyond that size.
google::protobuf::io::CodedInputStream::Limit limit =
input.PushLimit(size);
...
지정된 구분 기호를 사용하여 스트림에서 문자열을 읽는 데 getline을 사용할 수 있습니다.
istream& getline ( istream& is, string& str, char delim );
(헤더에 정의 됨)
'ProgramingTip' 카테고리의 다른 글
! =는 OCaml에서 의미가 있습니까? (0) | 2020.11.27 |
---|---|
RSync : 양방향으로 구매하고 있습니까? (0) | 2020.11.27 |
Python에서 PIL을 사용하여 이미지를 다른 이미지에 어떻게 합성하고 있습니까? (0) | 2020.11.27 |
bash를 사용하여 디렉토리의 모든 파일에서 클래스 경로를 어떻게 만드나요? (0) | 2020.11.27 |
웹킷 스크롤바 스타일을 지정된 요소에 적용 (0) | 2020.11.27 |