You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

1231 lines
38 KiB
Java

// TCPProtocol.java
package protocol; // protocol package
import java.util.*; // import Java utility classes
import support.*; // import Jasper support classes
/**
This is the class for a TCP protocol entity.
@author Iain A. Robin, Kenneth J. Turner
@version 1.0 (1st September 1999, IAR): initial version
<br/> 1.4 (9th March 2006, KJT): updated for JDK 1.5
<br/> 1.5 (26th July 2010, KJT): minor tidying; addition of code to
handle slow starts; empty PDU removed from receive buffer
after reception; state TIME_WAIT now entered after CLOSING or
FIN_WAIT2 and a maximum of two received PDUs allowed in this
state before closing; FIN is now acknowledged in LISTEN state;
a SYN in CLOSING or FIN_WAIT2 state now causes closing
*/
public class TCPProtocol implements ProtocolEntity, Timeouts {
/** Debug flag */
private final static boolean DEBUG = false;
// Protocol state constants
/** Closed state */
private final static int CLOSED = 0;
/** Listen state */
private final static int LISTEN = 1;
/** Synchronise received state */
private final static int SYN_RCVD = 2;
/** Synchronise sent state */
private final static int SYN_SENT = 3;
/** Synchronise pending at closed peer state */
private final static int SYN_PEND = 4;
/** Established state */
private final static int ESTABLISHED = 5;
/** Finish waiting 1 state */
private final static int FIN_WAIT1 = 6;
/** Finish waiting 2 state */
private final static int FIN_WAIT2 = 7;
/** Closing state */
private final static int CLOSING = 8;
/** Timed waiting state */
private final static int TIME_WAIT = 9;
/** Close waiting state */
private final static int CLOSE_WAIT = 10;
/** Closed state */
private final static int LAST_ACK = 11;
// TCP segment control flag constants
/** Urgent flag */
public final static int URG = TCPMessage.URG;
/** Urgent flag */
public final static int ACK = TCPMessage.ACK;
/** Push flag */
public final static int PSH = TCPMessage.PSH;
/** Reset flag */
public final static int RST = TCPMessage.RST;
/** Synchronise flag */
public final static int SYN = TCPMessage.SYN;
/** Finish flag */
public final static int FIN = TCPMessage.FIN;
/** Data flag */
private final static int DATA = 0;
// Other protocol constants
/** Maximum number of retransmission attempts (currently unused) */
private final static int RETRY_LIMIT = 10;
/**
Maximum number of PDUs to be received in TIME_WAIT state (the wait ought to
be 2 maximum segment lifetimes, but this is not meaningful as the simulation
does not use time)
*/
private final static int WAIT_LIMIT = 2;
// Protocol variables
/** Open confirmed message */
private final static String OPEN_CONFIRMED = TCPService.OPEN_CONFIRMED;
/** Send message */
private final static String SEND = "Send";
/** Deliver message */
private final static String DELIVER = "Deliver";
/** Resend message */
private final static String RESEND = "Timeout - resend";
/** Maximum medium size */
private static int segmentSize;
/** Peer protocol entity */
private ProtocolEntity peer;
/** Peer protocol entity */
private TCPService user;
/** Peer protocol entity */
private Medium medium;
/** Peer protocol entity */
private String name;
/** Events from entity */
private Vector<ProtocolEvent> events;
/** Events from user */
private Vector<ProtocolEvent> userEvents;
/** Segments for timeout */
private Vector<PDU> timedSegments;
/** Incoming segments */
private Vector<PDU> recvBuffer;
/** Segments for sending */
private Vector<PDU> sendBuffer;
/** Peer protocol entity */
private int role;
/** Current state */
private int state;
/** Previous state */
private int prevState = -1;
/** Next send sequence number */
private int sendSeq;
/** Acknowledgement sequence number to be sent */
private int sendAck;
/** Next expected sequence number */
private int recvSeq;
/** Acknowledgement sequence number received */
private int recvAck;
/** Local receive window */
private int recvWindow;
/** Peer receive window */
private int peerWindow;
/** Original peer initial receive window */
private int peerWindowOriginal;
/** Original peer initial receive sequence number */
private int peerSequenceOriginal;
/** Initial sequence number */
private int initSeq;
/** Local window size */
private int window;
/** Congestion window size */
private int congestionWindow;
/** Slow start threshold */
private int slowStartThreshold;
/** Start sequence number */
private int deliverableSeq;
/** Size of data */
private int deliverableSize;
/** Whether protocol requests have previously been made */
private boolean laterRequest;
/** Number of PDUs received in TIME_WAIT state */
private int waitCount;
/** Number of octets sent but not acknowledged */
private int sentPending;
/** Retry count (currently unused) */
private int retryCount;
/**
Constructor for a TCP protocol entity.
@param medium protocol medium
@param name protocol name
@param role protocol role
@param initSeq initial sequence number
@param window initial window
*/
public TCPProtocol(Medium medium, String name, int role, int initSeq,
int window) {
this.name = name;
this.medium = medium;
this.role = role;
this.initSeq = initSeq;
this.window = window;
initialise();
}
/**
Constructor for a TCP protocol entity (slow start subtype).
@param medium protocol medium
@param name protocol name
@param role protocol role
@param initSeq initial sequence number
@param window initial window
@param peerWindow initial peer window
@param peerSequence initial peer sequence number
*/
public TCPProtocol(Medium medium, String name, int role, int initSeq,
int window, int peerWindow, int peerSequence) {
this.name = name;
this.medium = medium;
this.role = role;
this.initSeq = initSeq;
this.window = window;
peerWindowOriginal = peerWindow;
peerSequenceOriginal = peerSequence;
initialise();
}
/**
Cancel retransmissions.
@param pdu PDU whose retranmission is to be cancelled
*/
private void cancelRetransmissions(TCPMessage pdu) {
if (pdu != null) {
int una = oldestUnacked(); // get oldest unack'ed
int ack = pdu.ack; // get ack seq no
if (una >= 0 && una < ack && ack <= sendSeq) { // ack OK
recvAck = ack; // note ack seq no
// identify TCP segment(s) being acknowledged
for (Enumeration enumeration = timedSegments.elements();
enumeration.hasMoreElements(); ) {
TCPMessage tcpdu = (TCPMessage) enumeration.nextElement();
if (tcpdu.seq + tcpdu.size <= ack) {
// segment fully acknowledged, remove from retransmission queue
timedSegments.removeElement(tcpdu);
sentPending -= tcpdu.size; // reduce amount of sent pending
enumeration = timedSegments.elements();
}
}
}
}
}
/**
Close the protocol by setting CLOSE state and re-initialising medium.
*/
public void closeProtocol() {
setState(CLOSED); // set state closed
}
/**
Return a protocol event with the given comment.
@param comment protocol comment
@return protocol event
*/
public ProtocolEvent comment(String comment) {
return(new ProtocolEvent(ProtocolEvent.COMMENT, this, comment));
}
/**
If this is protocol A, add to global protocol events (<var>events</var>) a
comment with the given congestion comment.
@return protocol event
*/
public void commentCongestion() {
if (name.equals("Protocol A")) { // protocol A?
String windowIncrease = // get window increase type
congestionWindow < slowStartThreshold ? "exp." : "lin.";
events.addElement( // add congestion comment
comment("cwind " + congestionWindow + " (" + windowIncrease + ")"));
}
}
/**
Return the protocol name.
@return The name value
*/
public String getName() {
return(name);
}
/**
Return protocol entity services.
@return protocol entity services
*/
public Vector<String> getServices() {
Vector<String> services = new Vector<String>();
int size = 0;
if (!sendBuffer.isEmpty()) { // send buffer non-empty?
int firstSize = // get first send element size
((PDU) sendBuffer.firstElement()).size;
int protocolWindow = getWindow(); // get window
size = Math.min(firstSize, protocolWindow);
if (protocolWindow > 0) { // can send to peer?
services.addElement(SEND + " " + size + " octets to peer");
}
}
if (!recvBuffer.isEmpty()) { // receive buffer non-empty?
sortBuffer(); // sort into ascending seq. nos.
// check first sequence number in buffer is one we expect
PDU pdu = (PDU) recvBuffer.firstElement();
int firstSeq = pdu.seq;
if (firstSeq <= recvSeq) { // check contiguous data
int seq = firstSeq;
for (Enumeration enumeration = recvBuffer.elements();
enumeration.hasMoreElements(); ) {
pdu = (PDU) enumeration.nextElement();
if (pdu.seq != seq)
break;
else
seq += pdu.size;
}
size = seq - firstSeq;
if (size > 0)
// offer to deliver buffered data to user
services.addElement(DELIVER + " " + size + " octets to user");
deliverableSeq = firstSeq;
deliverableSize = size;
}
}
// check for oldest segment that could have timed out
for (Enumeration enumeration = timedSegments.elements();
enumeration.hasMoreElements(); ) {
TCPMessage tcpdu = (TCPMessage) enumeration.nextElement();
if (tcpdu.seq == oldestUnacked())
services.addElement(RESEND + " " + tcpdu.getID());
}
return(services);
}
/**
Extract integer value from string s with prefix p, removing prefix p
(including space character at end).
@param s string
@param p prefix
@return extracted integer
*/
private int getSize(String s, String p) {
String s1 = s.substring(p.length() + 1);
return(Integer.parseInt(s1.substring(0, s1.indexOf(' '))));
}
/**
Return the available window. For client-server or peer-peer, this is the
peer window. For slow start, this is the minimum of the peer window and the
congestion window (less what has been sent but is outstanding).
@return available window
*/
private int getWindow() {
int availableWindow = // get available window
Math.min(congestionWindow - sentPending, peerWindow);
return(availableWindow); // return window
}
/**
Return whether the protocol uses a timer. All TCP segments are of same type;
the need for a timer depends on whether segment contains data.
@param type type description
@return true/false if protocol uses timer (always true)
*/
public boolean hasTimer(String type) {
return(true);
}
/**
Initialise the protocol.
*/
public void initialise() {
events = new Vector<ProtocolEvent>(); // empty events list
userEvents = new Vector<ProtocolEvent>(); // empty user events list
timedSegments = new Vector<PDU>(); // empty unack'ed segments list
sendBuffer = new Vector<PDU>(); // empty sent segments
recvBuffer = new Vector<PDU>(); // empty received segments
recvWindow = window; // init received window size
sendSeq = initSeq; // init send ack seq no
recvAck = -1; // init received ack seq no
waitCount = 0; // initialise waiting PDU count
sentPending = 0; // initialise sent pending count
retryCount = 0; // initialise retry count
if (TCP.isSlowStart()) { // slow start?
setState(ESTABLISHED); // start directly as established
congestionWindow = segmentSize; // initialise congestion window
slowStartThreshold = Integer.MAX_VALUE; // initialise threshold
laterRequest = false; // initialise not later request
peerWindow = peerWindowOriginal; // initialise peer window
recvSeq = peerSequenceOriginal; // initialise peer seq. no.
}
else { // client-server/peer-peer
setState(CLOSED); // start as closed
congestionWindow = Integer.MAX_VALUE; // set no congestion window
slowStartThreshold = Integer.MAX_VALUE; // set no threshold
}
}
/**
Check if a sequence number is a duplicate.
@param seq sequence number
@return true/false if sequence number is/is not a duplicate
*/
private boolean isDuplicate(int seq) {
// return true if receive buffer already contains
// a segment with sequence number seq
if (recvBuffer.isEmpty())
return(false);
for (Enumeration enumeration = recvBuffer.elements();
enumeration.hasMoreElements(); ) {
PDU pdu = (PDU) enumeration.nextElement();
if (pdu.seq == seq) {
return(true);
}
}
return(false);
}
/**
Check if the sequence number is within the window.
@param seq sequence number
@return true/false if sequence number is/is not within window
*/
private boolean isWithinWindow(int seq) {
return(recvSeq <= seq && seq < recvSeq + window);
}
/**
Return oldest unacknowledged sequence number.
@return Return Value
*/
private int oldestUnacked() {
if (timedSegments.isEmpty()) // no unacknowledged segments
return(-1);
int oldestSoFar = ((PDU) timedSegments.firstElement()).seq;
for (Enumeration e = timedSegments.elements(); e.hasMoreElements(); ) {
PDU pdu = (PDU) e.nextElement();
if (pdu.seq < oldestSoFar)
oldestSoFar = pdu.seq;
}
return(oldestSoFar);
}
/**
Perform given service.
@param service service
@return resulting protocol events
*/
public Vector<ProtocolEvent> performService(String service) {
events.removeAllElements(); // remove previous prot. events
userEvents.removeAllElements(); // remove previous user events
if (service.startsWith(SEND)) { // send message to peer?
// send buffered data to peer
int size = getSize(service, SEND);
if (sendBuffer.size() > 0) { // at least one element?
TCPMessage tcpdu = (TCPMessage) sendBuffer.firstElement();
sendToPeer(tcpdu.flags, size);
if (tcpdu.size > size)
tcpdu.size -= size;
else
sendBuffer.removeElement(tcpdu);
}
else // send buffer is empty
System.err.println("cannot send as send buffer is empty");
}
else if (service.startsWith(DELIVER)) { // deliver message to user?
// deliver buffered data to user
int size = deliverableSize;
int maxSeq = deliverableSeq + size;
int lastElement = 0; // first PDU to stay
for (Enumeration enumeration = recvBuffer.elements();
enumeration.hasMoreElements(); ) {
PDU pdu = (PDU) enumeration.nextElement();
if (pdu.seq > maxSeq)
break;
else
lastElement++;
}
while (lastElement-- > 0)
recvBuffer.remove(0);
recvWindow += size; // alter receive window
sendToUser(TCPService.DELIVER + " (" + size + ")");
deliverableSize = 0;
sendToPeer(0); // open up window
}
else if (service.startsWith(RESEND)) { // re-send on timeout?
if (TCP.isSlowStart()) { // slow start?
slowStartThreshold = // reset slow start theshold
Math.max((int) (congestionWindow / 2), segmentSize);
events.addElement( // add threshold comment
comment("ssthresh " + slowStartThreshold));
congestionWindow = segmentSize; // reset congestion window
commentCongestion(); // add congestion window comment
}
// identify segment to be retransmitted
String label = service.substring(RESEND.length() + 1);
for (Enumeration enumeration = timedSegments.elements();
enumeration.hasMoreElements(); ) {
TCPMessage tcpdu = (TCPMessage) enumeration.nextElement();
if (label.equals(tcpdu.getLabel())) {
transmitPDU(tcpdu, peer);
timedSegments.removeElement(tcpdu);
events.addElement(new ProtocolEvent(ProtocolEvent.TIMEOUT, tcpdu));
break;
}
}
}
for (Enumeration enumeration = userEvents.elements();
enumeration.hasMoreElements(); )
events.addElement((ProtocolEvent) enumeration.nextElement());
return(events);
}
/**
Check for data waiting in the send buffer with the push flag set.
*/
private void pushData() {
if (!sendBuffer.isEmpty()) {
TCPMessage tcpdu = (TCPMessage) sendBuffer.firstElement();
int protocolWindow = getWindow(); // get window
int size = Math.min(tcpdu.size, protocolWindow);
if (tcpdu.isPsh()) {
sendToPeer(tcpdu.flags, size);
if (tcpdu.size > size)
tcpdu.size -= size;
else
sendBuffer.removeElement(tcpdu);
}
}
}
/**
Check that data segment from peer is within receive window and is not a
duplicate of a segment already received.
@param pdu PDU
*/
private void receiveData(PDU pdu) {
int seq = pdu.seq;
int size = pdu.size;
if ((isWithinWindow(seq) ||
isWithinWindow(seq + size - 1)) &&
!isDuplicate(seq)) {
recvBuffer.addElement(pdu); // store PDU in receive buffer
recvWindow -= size;
sortBuffer(); // sort buffer
int firstSeq = ((PDU) recvBuffer.firstElement()).seq;
if (firstSeq <= recvSeq) {
deliverableSeq = firstSeq;
for (Enumeration enumeration = recvBuffer.elements();
enumeration.hasMoreElements(); ) {
pdu = (PDU) enumeration.nextElement();
if (pdu.seq > deliverableSeq)
break;
else if (pdu.seq == deliverableSeq)
deliverableSeq += pdu.size;
}
deliverableSize = deliverableSeq - firstSeq;
}
else {
deliverableSeq = recvSeq;
deliverableSize = 0;
}
sendAck = deliverableSeq; // update ack seq no
recvSeq = deliverableSeq; // update exp. seq no
recvWindow = window - deliverableSize; // update receive window
sendToPeer(ACK); // respond with ACK
if (size == 0) // PDU contains no data?
recvBuffer.removeElement(pdu); // remove from receive buffer
// must call getServices to ensure a buffer check when undoing
getServices(); // check buffers
}
else { // acknowledge duplicate
events.addElement(comment("ignored")); // add ignored comment
// sendAck = pdu.seq + pdu.size;
sendToPeer(ACK); // respond with ACK
}
}
/**
Receive a PDU/SDU
@param pdu PDU/SDU
@return resulting protocol events
*/
public Vector<ProtocolEvent> receivePDU(PDU pdu) {
events.removeAllElements(); // remove previous prot. events
userEvents.removeAllElements(); // remove previous user events
if (pdu == null)
return(events);
String pduType = pdu.type;
int flags = -1;
TCPMessage tcpduRecv = null;
if (pdu instanceof TCPMessage) {
tcpduRecv = (TCPMessage) pdu;
flags = tcpduRecv.flags;
peerWindow = tcpduRecv.win;
}
switch (state) {
case CLOSED: // closed?
if (pduType.equals(TCPService.ACTIVE_OPEN)) {
// step 1 of three-way handshake
sendToPeer(SYN);
setState(SYN_SENT);
}
else if (pduType.equals(TCPService.PASSIVE_OPEN))
setState(LISTEN);
else if (flags == SYN && role == TCP.PEER) { // closed peer gets SYN?
recvSeq = pdu.seq + pdu.size; // note next seq no
sendAck = recvSeq; // note seq no for ACK
events.addElement(comment("open queued")); // add open queued comment
setState(SYN_PEND); // note SYN pending
}
else if ((flags & RST) != 0) // reset?
timedSegments = new Vector<PDU>(); // cancel retransmissions
else // stray segment
reset(flags, pdu); // reset connection
break;
case LISTEN: // listening after passive open?
if (flags == SYN) {
// step 2 of three-way handshake
recvSeq = pdu.seq + pdu.size;
sendAck = recvSeq;
sendToPeer(SYN + ACK);
sendToUser(TCPService.OPEN_RECEIVED);
prevState = LISTEN;
setState(SYN_RCVD);
}
else if (flags == FIN) {
sendAck = pdu.seq + 1;
sendToPeer(ACK);
}
else if (pduType.equals(TCPService.CLOSE))
closeProtocol();
else if (pduType.equals(TCPService.SEND)) {
sendToPeer(SYN);
setState(SYN_SENT);
}
else {
events.addElement(comment("ignored")); // add ignored comment
reset(flags, pdu); // reset connection
}
break;
case SYN_SENT: // SYN sent?
if (flags == SYN + ACK) {
// step 3 of three-way handshake
cancelRetransmissions(tcpduRecv);
recvSeq = pdu.seq + pdu.size;
sendAck = recvSeq;
sendToPeer(ACK);
events.addElement(comment("established")); // add established comment
sendToUser(TCPService.OPEN_SUCCESS);
setState(ESTABLISHED);
}
else if (flags == SYN) { // SYN?
recvSeq = pdu.seq + pdu.size; // get next seq no
sendAck = recvSeq; // set ack seq no
sendSeq--; // reset send seq no
sendToPeer(SYN + ACK); // SYN and ACK
prevState = SYN_SENT; // previously SYN sent
setState(SYN_RCVD); // now SYN received
}
else if ((flags & RST) != 0) { // reset by peer?
sendToUser(TCPService.OPEN_FAILURE); // report rejected
timedSegments = new Vector<PDU>(); // cancel retrans
closeProtocol(); // now closed
}
else if ((flags & FIN) != 0) { // finish by peer?
sendToUser(TCPService.OPEN_FAILURE); // report rejected
timedSegments = new Vector<PDU>(); // cancel retrans
reset(flags, pdu); // reset connection
closeProtocol(); // now closed
}
else if (pduType.equals(TCPService.CLOSE)) // user close?
closeProtocol(); // now closed
else
events.addElement(comment("ignored"));// add ignored comment
break;
case SYN_RCVD: // SYN received?
if (flags == ACK || flags == SYN + ACK) {
// cancel timer on previously sent SYN + ACK segment:
cancelRetransmissions(tcpduRecv);
events.addElement(comment("established")); // add established comment
if (prevState == LISTEN) // passive open confirm?
transmitPDU(new PDU(OPEN_CONFIRMED), user);
if (prevState == SYN_SENT) // was SYN sent?
sendToUser(TCPService.OPEN_SUCCESS);
if (prevState == SYN_PEND) // was SYN pending?
sendToUser(TCPService.OPEN_SUCCESS);
setState(ESTABLISHED);
}
else if ((flags & FIN) != 0) { // other user closed?
sendToUser(TCPService.OPEN_FAILURE); // report open failure
timedSegments = new Vector<PDU>(); // cancel retrans
recvSeq = pdu.seq + pdu.size; // set next seq expected
sendAck = recvSeq; // set ack seq no
sendToPeer(FIN + ACK); // acknowledge FIN
prevState = SYN_RCVD; // was SYN received
setState(LAST_ACK); // waiting for FIN ack
}
else if (pduType.equals(TCPService.CLOSE)) {
sendToPeer(FIN);
setState(FIN_WAIT1); // wait for FIN ack
}
else if (flags == SYN) {
// duplicate SYN segment (due to retransmission)
timedSegments.removeAllElements();
sendSeq--; // restore seq no
peerWindow++; // restore peer window
sendToPeer(SYN + ACK); // resend SYN + ACK
}
else
events.addElement(comment("ignored"));// add ignored comment
break;
case SYN_PEND: // SYN for closed peer?
if (pduType.equals(TCPService.ACTIVE_OPEN)) { // active open?
sendToPeer(SYN + ACK); // SYN, ACK for prev SYN
prevState = SYN_PEND; // was SYN pending state
setState(SYN_RCVD); // now SYN rcvd. state
}
else if ((flags & RST) != 0) // reset?
closeProtocol(); // back to closed
else // not active open/reset
events.addElement(comment("ignored"));// add ignored comment
break;
case ESTABLISHED: // established?
int seq = pdu.seq; // seq. no. first octet
int size = pdu.size; // received lengt
if (pduType.startsWith(TCPService.SEND)) { // user data request
if (TCP.isSlowStart()) { // slow start?
if (!laterRequest) { // first request?
laterRequest = true; // note now later request
commentCongestion(); // add congestion comment
}
}
transmitData(pduType, size);
break;
}
else if (pduType.equals(TCPService.CLOSE)) { // user close
sendToPeer(FIN);
prevState = ESTABLISHED;
setState(FIN_WAIT1);
break;
}
if (tcpduRecv == null) // null PDU received?
break; // just in case
else if (flags == DATA || // DATA received or
(flags == DATA + ACK && size > 0)) { // DATA and ACK received?
if ((flags & ACK) != 0) // ACK?
updateWindow(); // update congestion window
receiveData(pdu);
}
else if (tcpduRecv.isPsh()) { // push data to user at once
if (isWithinWindow(seq) || isWithinWindow(seq + size - 1)) {
recvBuffer.addElement(pdu); // buffer to user, ack
sortBuffer();
seq = ((PDU) recvBuffer.firstElement()).seq;
PDU lastPDU = (PDU) recvBuffer.lastElement();
size = lastPDU.seq + lastPDU.size - seq;
sendAck = seq + size;
recvSeq += size; // update exp seq no
recvBuffer.removeAllElements();
recvWindow = window;
sendToPeer(ACK); // send ACK to peer
sendToUser(TCPService.DELIVER + " (" + size + ")");
}
else { // ack duplicate
sendAck = pdu.seq + pdu.size;
sendToPeer(ACK);
}
}
else if (flags == SYN + ACK) { // duplicate SYN with ACK?
cancelRetransmissions(tcpduRecv); // ack from peer
sendAck = pdu.seq + pdu.size; // get next ack seq no
sendToPeer(ACK); // acknowledge again
}
else if (flags == FIN) { // FIN received?
sendAck = pdu.seq + 1;
sendToPeer(ACK);
sendToUser(TCPService.CLOSING);
setState(CLOSE_WAIT);
}
else if ((flags & RST) != 0) { // RST received?
if (TCP.isSlowStart()) // slow start?
initialise(); // re-initialise protocol
else // client-server/peer-peer
; // *** what should happen here?
}
else if ((flags & ACK) != 0) { // ACK received?
updateWindow(); // update congestion window
cancelRetransmissions(tcpduRecv); // ack from peer
pushData(); // push data to user
int oldest = oldestUnacked(); // get oldest unacknowledged
if (oldest != -1) { // some data pending?
int pending = sendSeq - oldest; // get number of octets pending
peerWindow -= pending; // reduce window by pending
}
}
break;
case FIN_WAIT1: // FIN sent?
if (flags == DATA || (flags == DATA + ACK && pdu.size > 0))
receiveData(pdu);
else if (flags == ACK) {
cancelRetransmissions(tcpduRecv);
setState(FIN_WAIT2);
}
else if ((flags & SYN) != 0) // duplicate SYN?
sendToPeer(ACK); // acknowledge it
else if (flags == FIN && // FIN?
pdu.seq == recvSeq) { // expected seq no?
sendAck = pdu.seq + 1; // get seq no to ack
sendToPeer(ACK); // acknowledge FIN
setState(CLOSING); // now closing
}
else if (flags == FIN + ACK) { // FIN and ACK?
sendAck = pdu.seq + 1; // get seq no to ack
cancelRetransmissions(tcpduRecv); // cancel retrans
sendToPeer(ACK); // acknowledge FIN
if (prevState == ESTABLISHED) // was established?
sendToUser(TCPService.CLOSED); // report closed
closeProtocol(); // now closed
}
else if ((flags & RST) != 0) { // reset?
cancelRetransmissions(tcpduRecv); // cancel retrans
sendToUser(TCPService.CLOSED); // report closed
closeProtocol();
}
else
events.addElement(comment("ignored"));// add ignored comment
break;
case FIN_WAIT2: // FIN received?
if (flags == DATA) // DATA?
receiveData(pdu);
else if (flags == ACK) { // ACK?
cancelRetransmissions(tcpduRecv); // cancel retrans
setState(FIN_WAIT2);
}
else if (flags == SYN + ACK) // SYN and ACK?
// duplicate segment due to retransmission
sendToPeer(ACK);
else if (flags == FIN) { // FIN?
if (sendAck == pdu.seq) { // all segments sent?
sendAck = pdu.seq + 1;
if (deliverableSize > 0) // data to deliver?
sendToUser(TCPService.DELIVER + " (" + deliverableSize + ")");
recvWindow = window; // open receive window
recvBuffer.removeAllElements(); // empty receive buffer
sendToPeer(ACK); // send ack
sendToUser(TCPService.CLOSED); // notify user closed
closeProtocol(); // note state closed
}
else { // outstanding segments
sendToPeer(ACK); // send ack
setState(TIME_WAIT); // set time wait state
// sendToUser(TCPService.CLOSED);
// closeProtocol(); // following buffered
}
}
else if ((flags & RST) != 0 || // reset or
(flags & SYN) != 0) { // synchronise?
cancelRetransmissions(tcpduRecv); // cancel retrans
sendToUser(TCPService.CLOSED); // report closed
closeProtocol();
}
else
events.addElement(comment("ignored"));// add ignored comment
break;
case CLOSING: // closing?
if (flags == ACK || // finish ack or
(flags & RST) != 0) { // reset?
cancelRetransmissions(tcpduRecv); // cancel retrans
if (tcpduRecv.ack == sendSeq) { // all segments acknowledged?
sendToUser(TCPService.CLOSED); // report closed
closeProtocol(); // set closed state
}
else // enter final wait
setState(TIME_WAIT); // set time wait state
}
else if ((flags & RST) != 0 || // reset or
(flags & SYN) != 0) { // synchronise?
cancelRetransmissions(tcpduRecv); // cancel retransmission
sendToUser(TCPService.CLOSED); // report closed
closeProtocol();
}
else
events.addElement(comment("ignored"));// add ignored comment
break;
case CLOSE_WAIT: // waiting to close?
if (pduType.equals(TCPService.CLOSE)) {
sendToPeer(FIN);
setState(LAST_ACK);
}
if (pduType.startsWith(TCPService.SEND)) // user data request
transmitData(pduType, pdu.size);
if (tcpduRecv != null && tcpduRecv.isAck()) {
// acknowledgement from peer
cancelRetransmissions(tcpduRecv); // cancel retrans
pushData();
}
if (flags == DATA || (flags == DATA + ACK && pdu.size > 0))
receiveData(pdu);
if (flags == FIN) {
sendAck = pdu.seq + 1;
sendToPeer(ACK);
}
break;
case LAST_ACK: // last ACK expected?
if (flags == -1) // primitive (open)?
// sendToUser(TCPService.CLOSED); // report closed
// closeProtocol();
;
else if (flags == ACK) { // ACK?
cancelRetransmissions(tcpduRecv); // cancel retrans
if (timedSegments.size() == 0) { // queue now empty?
// prevState != SYN_RCVD) { // was not SYN received?
sendToUser(TCPService.CLOSED); // report closed
closeProtocol(); // now closed
}
}
else if (flags == FIN) { // FIN?
sendAck = pdu.seq + 1;
sendToPeer(ACK);
}
else if (flags == SYN) { // new SYN from peer?
sendToUser(TCPService.CLOSED); // report rejected
timedSegments = new Vector<PDU>(); // cancel retrans
reset(flags, pdu); // reset connection
closeProtocol(); // set closed
}
else if ((flags & RST) != 0) { // reset?
cancelRetransmissions(tcpduRecv); // cancel retrans
sendToUser(TCPService.CLOSED); // report closed
closeProtocol(); // set closed
}
else
events.addElement(comment("ignored"));// add ignored comment
break;
case TIME_WAIT: // final timed waiting?
waitCount++; // increment wait count
if (waitCount >= WAIT_LIMIT) { // wait limit reached?
cancelRetransmissions(tcpduRecv); // cancel retrans
sendToUser(TCPService.CLOSED); // report closed
closeProtocol(); // set closed
medium.initialise(); // re-initialise medium
}
}
for (Enumeration enumeration = userEvents.elements();
enumeration.hasMoreElements(); )
events.addElement((ProtocolEvent) enumeration.nextElement());
return(events);
} // end of receivePDU
/**
Handle Reset.
@param flags protocol flags
@param pdu PDU
*/
private void reset(int flags, PDU pdu) { // reset connection
if ((flags & ACK) != 0) { // ACK flag set?
sendSeq = recvSeq; // use incoming seq no
sendToPeer(RST); // reset peer
}
else { // ACK flag unset
sendSeq = 0; // default 0 seq no
recvSeq = pdu.seq + pdu.size; // note next seq no
sendAck = recvSeq; // note seq no for ACK
sendToPeer(RST + ACK); // reset peer
}
}
/**
Send message to peer with the specified flags.
@param flags protocol flags
*/
private void sendToPeer(int flags) {
if (flags == 0) { // pure window change
TCPMessage tcpdu = new TCPMessage(sendSeq, sendAck, 0, 0);
tcpdu.setWindowSize(recvWindow);
transmitPDU(tcpdu, peer);
events.addElement(new ProtocolEvent(ProtocolEvent.TRANSMIT, tcpdu));
}
else if (flags == ACK) { // pure ACK - no seq.
TCPMessage tcpdu = new TCPMessage(sendSeq, sendAck, ACK, 0);
tcpdu.setWindowSize(recvWindow);
events.addElement(new ProtocolEvent(ProtocolEvent.TRANSMIT, tcpdu));
transmitPDU(tcpdu, peer);
}
else
sendToPeer(flags, 1); // other - seq. 1
}
/**
Send message to peer with the specified flags and packet size.
@param flags protocol flags
@param packetSize packet size
*/
private void sendToPeer(int flags, int packetSize) {
// send packet of given size to peer entity with specified flags set
// if packet size too large for medium to handle, fragment it
for (int p = 0; p < packetSize; p += segmentSize) {
int size = Math.min(packetSize - p, segmentSize);
TCPMessage tcpdu = new TCPMessage(sendSeq, sendAck, flags, size);
if (tcpdu.isAck())
flags -= ACK; // ACK only in first
tcpdu.setWindowSize(recvWindow);
events.addElement(new ProtocolEvent(ProtocolEvent.TRANSMIT, tcpdu));
transmitPDU(tcpdu, peer);
sendSeq += size;
peerWindow -= size;
sentPending += size; // increase pending sent data
}
}
/**
Send message to user.
@param type SDU type
*/
private void sendToUser(String type) {
PDU pdu = new PDU(type);
transmitPDU(pdu, user);
events.addElement(new ProtocolEvent(ProtocolEvent.DELIVER, pdu));
}
/**
Set protocol peer.
@param peer protocol peer
*/
public void setPeer(ProtocolEntity peer) {
this.peer = peer;
}
/**
Set segment size (maximum send packet size).
@param size maximum send packet size
*/
public static void setSegmentSize(int size) {
segmentSize = size;
}
/**
Set the protocol state.
@param state protocol state
*/
public void setState(int state) {
if (DEBUG)
System.err.println("state (" + name + "): " + state);
this.state = state;
}
/**
Sets the timer for a PDU.
@param pdu PDU
@param enabled whether timer is enabled
*/
public void setTimer(PDU pdu, boolean enabled) {
if (enabled && pdu.size > 0 && // enabled, non-empty?
pdu.seq >= recvAck && // new segment?
!((TCPMessage) pdu).isRst()) // no reset flag?
timedSegments.addElement(pdu);
}
/**
Set the protocol user.
@param user protocol user (service entity)
*/
public void setUser(ProtocolEntity user) {
this.user = (TCPService) user;
}
/**
Set the protocol window size.
@param winSize window size
*/
public void setWindowSize(int winSize) {
recvWindow = winSize;
}
/**
Set the protocol default window size.
@param winSize default window size
*/
public void setWindowSizeDefault(int winSize) {
window = winSize;
recvWindow = winSize;
}
/** Description of the Method */
private void sortBuffer() {
PDU pdu1;
PDU pdu2;
if (recvBuffer.isEmpty())
return;
int size = recvBuffer.size(); // sort buffer in order
if (size < 2)
return; // no need to sort
for (int i = 0; i < size; i++)
for (int j = i + 1; j < size; j++) {
pdu1 = (PDU) recvBuffer.elementAt(i);
pdu2 = (PDU) recvBuffer.elementAt(j);
if (pdu2.seq < pdu1.seq) {
recvBuffer.setElementAt(pdu2, i);
recvBuffer.setElementAt(pdu1, j);
}
}
}
/**
Transmit protocol data.
@param pduType PDU type
@param size PDU size
*/
private void transmitData(String pduType, int size) {
// check whether PUSH flag is to be set
int sendFlags = pduType.indexOf(TCPService.PUSH) > 0 ? PSH : DATA;
int protocolWindow = getWindow(); // get window
if (size <= protocolWindow) { // can send all the data?
if (deliverableSize > 0) { // piggyback ACK on data
sendAck = deliverableSeq + deliverableSize;
sendFlags += ACK;
}
sendToPeer(sendFlags, size);
}
else { // can send only some
// buffer data to be sent when peer's receive buffer has space
if (protocolWindow > 0) {
size -= protocolWindow; // send as much as possible
sendToPeer(sendFlags, protocolWindow);
}
// ... and buffer the rest
TCPMessage tcpdu = new TCPMessage(sendSeq, sendAck, sendFlags, size);
tcpdu.setWindowSize(recvWindow);
sendBuffer.addElement(tcpdu);
events.addElement(comment("buffered")); // add buffered comment
}
}
/**
Transmit PDU.
@param pdu PDU
@param destination destination
*/
public void transmitPDU(PDU pdu, ProtocolEntity destination) {
pdu.setSource(this); // set PDU source
pdu.setDestination(destination); // set PDU destination
if (destination == peer) { // sending to peer?
userEvents = medium.receivePDU(pdu);
}
else // sending to service
userEvents = user.receivePDU(pdu);
}
/**
Update congestion window on ACK if window management subtype.
*/
public void updateWindow() {
if (TCP.isSlowStart()) { // slow start?
congestionWindow += segmentSize; // augment congestion window
commentCongestion(); // add congestion window comment
}
}
}