| Viewing file:  test_bio_ssl.py (4.46 KB)      -rw-r--r-- Select action/file-type:
 
  (+) |  (+) |  (+) | Code (+) | Session (+) |  (+) | SDB (+) |  (+) |  (+) |  (+) |  (+) |  (+) | 
 
#!/usr/bin/env python
 """Unit tests for M2Crypto.BIO.File.
 
 Copyright (c) 1999-2002 Ng Pheng Siong. All rights reserved."""
 
 import unittest, threading, sys, socket
 
 from M2Crypto import BIO
 from M2Crypto import SSL
 from M2Crypto import Err
 from M2Crypto import Rand
 from M2Crypto import threading as m2threading
 
 from test_ssl import srv_host, srv_port
 
 class HandshakeClient(threading.Thread):
 
 def __init__(self, host, port):
 threading.Thread.__init__(self)
 self.host = host
 self.port = port
 
 def run(self):
 ctx = SSL.Context()
 ctx.load_cert_chain("server.pem")
 conn = SSL.Connection(ctx)
 cipher_list = conn.get_cipher_list()
 sslbio = BIO.SSLBio()
 readbio = BIO.MemoryBuffer()
 writebio = BIO.MemoryBuffer()
 sslbio.set_ssl(conn)
 conn.set_bio(readbio, writebio)
 conn.set_connect_state()
 sock = socket.socket()
 sock.connect((self.host, self.port))
 
 handshake_complete = False
 while not handshake_complete:
 ret = sslbio.do_handshake()
 if ret <= 0:
 if not sslbio.should_retry() or not sslbio.should_read():
 err_string = Err.get_error()
 print err_string
 sys.exit("unrecoverable error in handshake - client")
 else:
 output_token  = writebio.read()
 if output_token is not None:
 sock.sendall(output_token)
 else:
 input_token = sock.recv(1024)
 readbio.write(input_token)
 else:
 handshake_complete = True
 
 sock.close()
 
 
 class SSLTestCase(unittest.TestCase):
 
 def setUp(self):
 self.sslbio = BIO.SSLBio()
 
 def check_set_ssl(self):
 ctx = SSL.Context()
 conn = SSL.Connection(ctx)
 self.sslbio.set_ssl(conn)
 
 def check_do_handshake_fail(self):
 ctx = SSL.Context()
 conn = SSL.Connection(ctx)
 conn.set_connect_state()
 self.sslbio.set_ssl(conn)
 ret = self.sslbio.do_handshake()
 assert ret == 0
 
 def check_should_retry_fail(self):
 ctx = SSL.Context()
 conn = SSL.Connection(ctx)
 self.sslbio.set_ssl(conn)
 ret = self.sslbio.do_handshake()
 assert ret == -1
 ret = self.sslbio.should_retry()
 assert ret == 0
 
 def check_should_write_fail(self):
 ctx = SSL.Context()
 conn = SSL.Connection(ctx)
 self.sslbio.set_ssl(conn)
 ret = self.sslbio.do_handshake()
 assert ret == -1
 ret = self.sslbio.should_write()
 assert ret == 0
 
 def check_should_read_fail(self):
 ctx = SSL.Context()
 conn = SSL.Connection(ctx)
 self.sslbio.set_ssl(conn)
 ret = self.sslbio.do_handshake()
 assert ret == -1
 ret = self.sslbio.should_read()
 assert ret == 0
 
 def check_do_handshake_succeed(self):
 ctx = SSL.Context()
 ctx.load_cert_chain("server.pem")
 conn = SSL.Connection(ctx)
 self.sslbio.set_ssl(conn)
 readbio = BIO.MemoryBuffer()
 writebio = BIO.MemoryBuffer()
 conn.set_bio(readbio, writebio)
 conn.set_accept_state()
 handshake_complete = False
 sock = socket.socket()
 sock.bind((srv_host, srv_port))
 sock.listen(5)
 handshake_client = HandshakeClient(srv_host, srv_port)
 handshake_client.start()
 new_sock, addr = sock.accept()
 while not handshake_complete:
 input_token = new_sock.recv(1024)
 readbio.write(input_token)
 
 ret = self.sslbio.do_handshake()
 if ret <= 0:
 if not self.sslbio.should_retry() or not self.sslbio.should_read():
 sys.exit("unrecoverable error in handshake - server")
 else:
 handshake_complete = True
 
 output_token  = writebio.read()
 if output_token is not None:
 new_sock.sendall(output_token)
 
 handshake_client.join()
 sock.close()
 new_sock.close()
 
 def suite():
 return unittest.makeSuite(SSLTestCase, 'check_')
 
 
 if __name__ == '__main__':
 Rand.load_file('randpool.dat', -1)
 m2threading.init()
 unittest.TextTestRunner().run(suite())
 m2threading.cleanup()
 Rand.save_file('randpool.dat')
 
 |