Enhancing Cross-Language Protobuf Protocol for Improved Data Handling
Introduction
In modern software development, ensuring efficient and reliable data exchange between different programming languages is crucial. This project presents a significant enhancement to the protobuf protocol used between Python and Kotlin, addressing critical issues in the existing implementation and introducing a lightweight solution.
Problem in Existing Protobuf Python SDK
The previous implementation of the protobuf Python SDK did not transmit the message length separately. This omission made it impossible to read only the message length from a stream, leading to problems in scenarios involving continuous data exchange. Without knowing the exact size of incoming messages, the system was prone to errors from redundant data, especially in persistent connections.
Actually, there are methods like parseFromDelimited
and writeToDelimited
in the Java SDK that can be used to read and write length-delimited messages. However, these methods are not available in the Python SDK.
Related Posts
Here are some related posts that I found while researching this issue:
- Similar issues in C#
- Receive delimited Protobuf message in python via TCP
- Length-Delimited Protobuf Streams
Proposed Lightweight Solution
To address the aforementioned issue, I introduced a straightforward yet effective enhancement: prefixing each message with its length. This simple modification ensures that both the sending and receiving ends of a communication link can accurately determine the size of the data being exchanged, thus preventing the transmission of excess data and reducing the likelihood of data corruption.
Implementation Details
Below are snippets of the code used in both Python and Kotlin to implement this solution:
Python (Sender)
import struct
import socket
import action_space_pb2
from typing import List
def send_action2(sock: socket.socket, action_array: List[int]):
# Make a message
action_space = action_space_pb2.ActionSpaceMessage()
action_space.action.extend(action_array)
action_space.command = ""
# Serialize the message
v = action_space.SerializeToString()
# First send the length of the message
sock.send(struct.pack("<I", len(v)))
# Then send the message
sock.sendall(v)
Python (Reader)
import struct
import socket
import observation_space_pb2
def read_one_observation(sock: socket.socket) -> (int, ObsType):
# First read the length of the message
data_len_bytes = sock.read(4, True)
# Unpack the length
data_len = struct.unpack("<I", data_len_bytes)[0]
# Then read the message
data_bytes = sock.read(data_len, True)
# Parse the message
observation_space = observation_space_pb2.ObservationSpaceMessage()
observation_space.ParseFromString(data_bytes)
return data_len, observation_space
Kotlin(Sender)
fun writeObservation(observationSpace: ObservationSpace.ObservationSpaceMessage) {
// Prepare buffer for writing
val bufferSize = 4 + observationSpace.serializedSize
val buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN)
// Write message length
buffer.putInt(observationSpace.serializedSize)
// Write message
val byteArrayOutputStream = ByteArrayOutputStream()
observationSpace.writeTo(byteArrayOutputStream)
buffer.put(byteArrayOutputStream.toByteArray())
// Flip the buffer to read mode (buffer -> channel)
buffer.flip()
// Write buffer to SocketChannel
while (buffer.hasRemaining()) {
socketChannel.write(buffer)
}
}
Kotlin(Reader)
fun readAction(): ActionSpace.ActionSpaceMessage {
// First read the length of the message
val buffer = ByteBuffer.allocate(Integer.BYTES) // 4 bytes
socketChannel.fillBuffer(buffer)
buffer.flip()
val len = buffer.order(ByteOrder.LITTLE_ENDIAN).int
// Then read the message
val bytes = socketChannel.readNBytes(len)
// Parse the message
val actionSpace = ActionSpace.ActionSpaceMessage.parseFrom(bytes)
return actionSpace
}
In Kotlin side, we used an extension function readNBytes
to read len
bytes from the socket channel. The fillBuffer
function is used to fill the buffer with data from the socket channel.
fun SocketChannel.fillBuffer(buffer: ByteBuffer) {
while (buffer.hasRemaining()) {
read(buffer)
}
}
fun SocketChannel.readNBytes(len: Int): ByteArray {
val buffer = ByteBuffer.allocate(len)
var readLen = 0
while (readLen < len) {
val bytesRead = read(buffer)
if (bytesRead == -1) {
throw IOException("EOF")
}
readLen += bytesRead
}
return buffer.array()
}
Conclusion
This enhancement to the protobuf protocol not only improves the reliability of data transmission between Python and Kotlin but also serves as a model for similar improvements in other cross-language implementations. By ensuring precise data handling and reducing redundancy, this solution contributes to more robust and scalable systems.