Goal:
How to modify hbase thrift client code if Hbase Thrift Service enables framed transportation and compact protocol.The background is:
To avoid thrift service crash issue mentioned in HBASE-11052, we need to enable framed transport and compact protocol in hbase-site.xml and restart Hbase Thrift Service as below:
<property> <name>hbase.regionserver.thrift.framed</name> <value>true</value> </property> <property> <name>hbase.regionserver.thrift.framed.max_frame_size_in_mb</name> <value>2</value> </property> <property> <name>hbase.regionserver.thrift.compact</name> <value>true</value> </property>After that, the old Hbase thrift client code need to be modified, otherwise it will fail with below error:
thrift.transport.TTransport.TTransportException: TSocket read 0 bytesThis article explains what to modify in hbase thrift code to make the job compatible with framed transport and compact protocol.
Env:
HBase 1.1.1MapR 5.2
Solution:
1. If framed transport is enabled
We need to modify FROM:from thrift.transport import TTransport transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port))TO:
from thrift.transport.TTransport import TFramedTransport transport = TFramedTransport(TSocket.TSocket(host, port))
2. If compact protocol is enabled
We need to modify FROM:from thrift.protocol import TBinaryProtocol protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport)TO:
from thrift.protocol import TCompactProtocol protocol = TCompactProtocol.TCompactProtocol(transport)
One complete example code in python is as below:
from thrift.transport import TSocket #from thrift.protocol import TBinaryProtocol from thrift.protocol import TCompactProtocol #from thrift.transport import TTransport from thrift.transport.TTransport import TFramedTransport from hbase import Hbase host = "localhost" port = "9090" tablename = "/user/mapr/some_table" numRows = 100 columnName = "cf1:col1" # Connect to HBase Thrift server #transport = TTransport.TBufferedTransport(TSocket.TSocket(host, port)) transport = TFramedTransport(TSocket.TSocket(host, port)) #protocol = TBinaryProtocol.TBinaryProtocolAccelerated(transport) protocol = TCompactProtocol.TCompactProtocol(transport) # Create and open the client connection client = Hbase.Client(protocol) transport.open() # Scan the maprdb table scan = Hbase.TScan(startRow="111", stopRow="222") scannerId = client.scannerOpenWithScan(tablename, scan, None) row = client.scannerGet(scannerId) rowList = client.scannerGetList(scannerId,numRows) while rowList: for row in rowList: message = row.columns.get(columnName).value rowKey = row.row print "rowKey = " + rowKey + ", columnValue = " + message rowList = client.scannerGetList(scannerId,numRows) client.scannerClose(scannerId) transport.close()
No comments:
Post a Comment