3
sY]2                 @   s.  d dl Z d dlZd dlZd dlZd dlmZ d dlZejdZej	Z	dd Z
G dd dejZG dd	 d	eZG d
d dejZG dd dejZejdd Zf efddZG dd dejZG dd deZG dd deZeZG dd dejZG dd dejZG dd deZedkr*ej  dS )    N)support	threadingc             C   sT   |j   | j  z4y|j \}}|j  W n tjk
r@   Y nX W d |j  X d S )N)ZlistensetZacceptclosesockettimeout)evtZservZconnZaddr r	   $lib/python3.6/test/test_telnetlib.pyserver   s    
r   c               @   sT   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dd Z
dd ZdS )GeneralTestsc             C   sr   t j | _tjtjtj| _| jjd tj	| j| _
t jt| j| jfd| _| jjd | jj  | jj  d S )N<   )targetargsT)r   ZEventr   r   ZAF_INETZSOCK_STREAMsockZ
settimeoutr   Z	bind_portportZThreadr   threadZ	setDaemonstartwait)selfr	   r	   r
   setUp   s    

zGeneralTests.setUpc             C   s   | j j  | ` d S )N)r   join)r   r	   r	   r
   tearDown#   s    
zGeneralTests.tearDownc             C   s   t jt| j}|jj  d S )N)	telnetlibTelnetHOSTr   r   r   )r   telnetr	   r	   r
   	testBasic'   s    zGeneralTests.testBasicc          	   C   s:   t jt| j}| j|j  W d Q R X | j|j  d S )N)r   r   r   r   ZassertIsNotNone
get_socketZassertIsNone)r   Ztnr	   r	   r
   testContextManager,   s    zGeneralTests.testContextManagerc             C   s\   | j tj d k tjd ztjt| j}W d tjd  X | j|j	j
 d |j	j  d S )N   )
assertTruer   getdefaulttimeoutsetdefaulttimeoutr   r   r   r   assertEqualr   
gettimeoutr   )r   r   r	   r	   r
   testTimeoutDefault1   s    
zGeneralTests.testTimeoutDefaultc             C   sb   | j tj d k tjd ztjt| jd d}W d tjd  X | j |jj	 d k |jj
  d S )Nr    )r   )r!   r   r"   r#   r   r   r   r   r   r%   r   )r   r   r	   r	   r
   testTimeoutNone;   s    
zGeneralTests.testTimeoutNonec             C   s2   t jt| jdd}| j|jj d |jj  d S )Nr    )r   )r   r   r   r   r$   r   r%   r   )r   r   r	   r	   r
   testTimeoutValueF   s    zGeneralTests.testTimeoutValuec             C   s:   t j }|jt| jdd | j|jj d |jj  d S )Nr    )r   )	r   r   openr   r   r$   r   r%   r   )r   r   r	   r	   r
   testTimeoutOpenK   s    zGeneralTests.testTimeoutOpenc             C   sJ   t jt| jdd}|j}| j|j | | j|j |j  |jj  d S )Nr    )r   )	r   r   r   r   r   r$   r   filenor   )r   r   Zt_sockr	   r	   r
   testGettersQ   s
    zGeneralTests.testGettersN)__name__
__module____qualname__r   r   r   r   r&   r'   r(   r*   r,   r	   r	   r	   r
   r      s   

r   c               @   s,   e Zd ZdZf fddZdd Zdd ZdS )	
SocketStubz* a socket proxy that re-defines sendall() c             C   s   t || _g | _d| _d S )NF)listreadswritesblock)r   r2   r	   r	   r
   __init__[   s    
zSocketStub.__init__c             C   s   | j j| d S )N)r3   append)r   datar	   r	   r
   sendall_   s    zSocketStub.sendallc             C   s^   d}x&| j r*t||k r*|| j jd7 }qW t||krZ| j jd||d   |d | }|S )N    r   )r2   lenpopinsert)r   sizeoutr	   r	   r
   recva   s    zSocketStub.recvN)r-   r.   r/   __doc__r5   r8   r?   r	   r	   r	   r
   r0   Y   s   r0   c               @   s,   e Zd Zdd Zdd Zdd Zdd Zd	S )
TelnetAlikec             C   s
   t  d S )N)NotImplementedError)r   r	   r	   r
   r+   k   s    zTelnetAlike.filenoc             C   s   d S )Nr	   )r   r	   r	   r
   r   m   s    zTelnetAlike.closec             C   s
   | j j S )N)r   r4   )r   r	   r	   r
   
sock_availn   s    zTelnetAlike.sock_availc          
   G   s>   t j }tjj| |f|  W d Q R X |  j|j 7  _d S )N)r   Zcaptured_stdoutr   r   msg	_messagesgetvalue)r   rD   r   r>   r	   r	   r
   rD   p   s    
zTelnetAlike.msgN)r-   r.   r/   r+   r   rC   rD   r	   r	   r	   r
   rA   j   s   rA   c               @   sD   e Zd Zdd Zedd ZdddZdd	 Zdd
dZdd Z	dS )MockSelectorc             C   s
   i | _ d S )N)keys)r   r	   r	   r
   r5   x   s    zMockSelector.__init__c             C   s   dS )NgMbP?r	   )r   r	   r	   r
   
resolution{   s    zMockSelector.resolutionNc             C   s   t j|d||}|| j|< |S )Nr   )	selectorsZSelectorKeyrH   )r   fileobjeventsr7   keyr	   r	   r
   register   s    
zMockSelector.registerc             C   s   | j j|S )N)rH   r;   )r   rK   r	   r	   r
   
unregister   s    zMockSelector.unregisterc             C   sH   d}x"| j D ]}t|tr|jj}P qW |r0g S dd | j j D S d S )NFc             S   s   g | ]}||j fqS r	   )rL   ).0rM   r	   r	   r
   
<listcomp>   s    z'MockSelector.select.<locals>.<listcomp>)rH   
isinstancerA   r   r4   values)r   r   r4   rK   r	   r	   r
   select   s    
zMockSelector.selectc             C   s   | j S )N)rH   )r   r	   r	   r
   get_map   s    zMockSelector.get_map)N)N)
r-   r.   r/   r5   propertyrI   rN   rO   rT   rU   r	   r	   r	   r
   rG   v   s   

rG   c             #   s0    fdd}zt j}|t _d V  W d |t _X d S )Nc                 s   t  S )N)r0   )Zignored)r2   r	   r
   new_conn   s    ztest_socket.<locals>.new_conn)r   Zcreate_connection)r2   rW   Zold_connr	   )r2   r
   test_socket   s    
rX   c          
   C   sJ   x | D ]}t |tkst|qW t|  |dd}d|_W dQ R X |S )za return a telnetlib.Telnet object that uses a SocketStub with
        reads queued up to be read dummyr    N)typebytesAssertionErrorrX   rE   )r2   clsxr   r	   r	   r
   test_telnet   s    


r`   c               @   s   e Zd Zdd Zdd ZdS )ExpectAndReadTestCasec             C   s   t j| _tt _d S )N)r   _TelnetSelectorold_selectorrG   )r   r	   r	   r
   r      s    zExpectAndReadTestCase.setUpc             C   s   | j t_d S )N)rc   r   rb   )r   r	   r	   r
   r      s    zExpectAndReadTestCase.tearDownN)r-   r.   r/   r   r   r	   r	   r	   r
   ra      s   ra   c               @   sD   e Zd Zdd Zdd Zdd Zdd Zd	d
 Zdd Zdd Z	dS )	ReadTestsc             C   s|   dg}t |}|jd}| j|d|j|j|jjfd dd ddd g}dj|d	d }t |}|jd}| j|| d	S )zc
        read_until(expected, timeout=None)
        test the blocking version of read_util
        s   xxxmatchyyys   matchs   xxxmatch)rD      x2      yr9   N   )r`   Z
read_untilr$   ZcookedqZrawqr   r2   r   )r   wantr   r7   r2   expectr	   r	   r
   test_read_until   s    

zReadTests.test_read_untilc             C   s@   dd dd dd g}dj |}t|}|j }| j|| dS )zJ
        read_all()
          Read all data until EOF; may block.
        re   i  rg      zr9   N)r   r`   read_allr$   )r   r2   rk   r   r7   r	   r	   r
   test_read_all   s    
zReadTests.test_read_allc             C   sF   t dd g}|j }| jt|dk t  }|j }| jd| dS )zQ
        read_some()
          Read at least one byte or EOF; may block.
        re   i  rh   r9   N)r`   Z	read_somer!   r:   r$   )r   r   r7   r	   r	   r
   test_read_some   s    zReadTests.test_read_somec             C   sz   dd }t |g}t||}d|j_| jd|  d|j_d}x*y|| 7 }W q@ tk
rd   P Y q@X q@W | j|| dS )z
        read_*_eager()
          Read all data available already queued or on the socket,
          without blocking.
        re   d   Tr9   FN)r`   getattrr   r4   r$   EOFError)r   Z	func_namerj   r   funcr7   r	   r	   r
   _read_eager   s    

zReadTests._read_eagerc             C   s   | j d | j d d S )NZ
read_eagerZread_very_eager)ru   )r   r	   r	   r
   test_read_eager   s    
zReadTests.test_read_eagerc             C   s^   dd }t |g}| jd|j  x|jjr6|j  q$W |j }| j|| | jt|j d S )Nre   rq   r9   )r`   r$   read_very_lazyr   r2   	fill_rawqZassertRaisesrs   )r   rj   r   r7   r	   r	   r
   rw      s    

zReadTests.read_very_lazyc             C   s   dd }t |g}| jd|j  d}xLy |j }||7 }|sF|j  W n tk
r^   P Y nX | j|j| q(W | j|| d S )Nre   rq   r9   )r`   r$   Z	read_lazyrx   rs   r!   
startswith)r   rj   r   r7   Z	read_datar	   r	   r
   test_read_lazy  s    
zReadTests.test_read_lazyN)
r-   r.   r/   rl   ro   rp   ru   rv   rw   rz   r	   r	   r	   r
   rd      s   
rd   c               @   s   e Zd ZdddZdd ZdS )nego_collectorNc             C   s   d| _ || _d| _d S )Nr9   )seen	sb_gettersb_seen)r   r}   r	   r	   r
   r5     s    znego_collector.__init__c             C   s<   |  j || 7  _ |tjkr8| jr8| j }|  j|7  _d S )N)r|   tlSEr}   r~   )r   r   cmdZoptZsb_datar	   r	   r
   do_nego  s    znego_collector.do_nego)N)r-   r.   r/   r5   r   r	   r	   r	   r
   r{     s   
r{   c               @   s   e Zd ZdZdd ZdS )
WriteTestszKThe only thing that write does is replace each tl.IAC for
    tl.IAC+tl.IACc             C   s~   ddt j d dt j t j d t j t jdg}xH|D ]@}t }|j| dj|jj}| j|jt jt jt j | q6W d S )Ns   data sample without IACs   data sample withs    one IACs   a fews    iacsr9   )	r   IACr`   writer   r   r3   r$   replace)r   Zdata_sampler7   r   Zwrittenr	   r	   r
   
test_write'  s    

zWriteTests.test_writeN)r-   r.   r/   r@   r   r	   r	   r	   r
   r   #  s   r   c               @   s`   e Zd Zejejejejejej	ej
ejgZdd Zdd Zdd Zdd Zd	d
 Zdd ZdS )OptionTestsc             C   s   t |}tdj|}t }|j|j |j }|j}| jt|dk | j	|dd | j
 | j|dd tj | j|t||  d|_dS )z helper for testing IAC + cmd r9   r   Nrh      )r`   r:   r   r{   set_option_negotiation_callbackr   rn   r|   r!   assertIncmdsr$   r   ZNOOPTr}   )r   r7   r   Zdata_lennegotxtr   r	   r	   r
   _test_command7  s    zOptionTests._test_commandc             C   sj   xN| j D ]D}| jtj|g | jdd tj|dd g | jdtj|dg qW | jdd | j D  d S )	Nre   rq   rg   
   c             S   s   g | ]}t j| qS r	   )r   r   )rP   r   r	   r	   r
   rQ   K  s    z1OptionTests.test_IAC_commands.<locals>.<listcomp>s
   xxxxxxxxxxs
   yyyyyyyyyy)r   r   r   r   )r   r   r	   r	   r
   test_IAC_commandsE  s
    zOptionTests.test_IAC_commandsc             C   s0  t jt j t j t j t jt j t j t j t j t j t jt j t j t j d t j t j t jt j d t j t j t j t j t jt j d t j t j d t j t j g}t|}t|j}|j|j |j	 }| j
|d t jt j d t j d t j d }| j
|j| | j
d|j  d |_d S )Ns   aas   bbs   ccs   ddr9   s   aabb)r   r   ZSBr   r`   r{   Zread_sb_datar   r   rn   r$   r~   r}   )r   sendr   r   r   Zwant_sb_datar	   r	   r
   test_SB_commandsM  s    "&&.
$zOptionTests.test_SB_commandsc             C   s   d
t jtdg dft jt j tdg dft jt j tdg dft jt j tdg dft jt j tdg d	fg}x:|D ]2\}}t|g}|jd |j	 }| j
||j q~W d S )N   a: recv b''
X   z: IAC 88 not recognized
rh   z: IAC DO 1
z: IAC DONT 1
z: IAC WILL 1
z: IAC WONT 1
)r   r   )r   r   r\   ZDOZDONTZWILLZWONTr`   set_debuglevelrn   r   rE   )r   Zgiven_a_expect_babr   r   r	   r	   r
   test_debuglevel_reads_  s    

z!OptionTests.test_debuglevel_readsc             C   s0   t  }|jd |jd d}| j||j d S )Nrh   s   xxxzsend b'xxx'
)r`   r   r   r   rE   )r   r   Zexpectedr	   r	   r
   test_debuglevel_writer  s
    

z!OptionTests.test_debuglevel_writec          
   C   sJ   t g  tdd}d|_W d Q R X |jd |jd | j|jd d S )NrY   0rZ   rh   testz0.*test)rX   rA   rE   r   rD   ZassertRegex)r   r   r	   r	   r
   test_debug_accepts_str_porty  s    



z'OptionTests.test_debug_accepts_str_portN)r-   r.   r/   r   ZAOZAYTZBRKZECZELZGAZIPZNOPr   r   r   r   r   r   r   r	   r	   r	   r
   r   3  s   $r   c               @   s   e Zd Zdd ZdS )ExpectTestsc             C   sB   ddd	g}t |}|jdg\}}}| j|dj|dd
  dS )z
        expect(expected, [timeout])
          Read until the expected string has been seen, or a timeout is
          hit (default is no timeout); may block.
        re   r   s   matchrg   r9   Nrh   s
   xxxxxxxxxxs
   yyyyyyyyyyri   )r`   rk   r$   r   )r   rj   r   _r7   r	   r	   r
   test_expect  s    
zExpectTests.test_expectN)r-   r.   r/   r   r	   r	   r	   r
   r     s   r   __main__)r   rJ   r   
contextlibr   r   Zunittestimport_moduler   r   r   ZTestCaser   objectr0   r   rA   ZBaseSelectorrG   contextmanagerrX   r`   ra   rd   r{   r   r   r   r   r-   mainr	   r	   r	   r
   <module>   s.   
B 
bP
