package tracker; import tools.ServeErrors; import tools.HostItem; import tools.Logger; import tools.LogLevel; import java.util.Map; import java.util.HashMap; import java.util.ArrayList; import java.util.List; import protocolP2P.ProtocolP2PPacket; import protocolP2P.Payload; import protocolP2P.DiscoverRequest; import protocolP2P.DiscoverResponse; import protocolP2P.FileList; import protocolP2P.Unregister; import protocolP2P.Register; import protocolP2P.RequestResponseCode; import protocolP2P.RatioRequest; import protocolP2P.RatioResponse; import protocolP2P.UpdateRatio; import protocolP2P.SizeRequest; import protocolP2P.SizeResponse; import localException.InternalError; import remoteException.EmptyDirectory; import exception.LocalException; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicBoolean; /** Tracker management implementation * @author Louis Royer * @author Flavien Haas * @author JS Auge * @version 1.0 */ public abstract class TrackerManagement extends ServeErrors implements Runnable { protected HostItem tracker; protected Logger logger; protected List hostList = new ArrayList<>(); protected Map ratioUp = new HashMap<>(); protected Map ratioDown = new HashMap<>(); protected Map> fileList = new HashMap<>(); protected Map fileSize = new HashMap<>(); protected volatile boolean stop; protected AtomicBoolean writeLock; protected AtomicInteger readLock; /** Constructor * @param tracker Tracker HostItem * @param logger Logger */ public TrackerManagement(HostItem tracker, Logger logger) { stop = false; this.tracker = tracker; this.logger = logger; writeLock = new AtomicBoolean(); readLock = new AtomicInteger(); } /** Handle Discover request * @param pd Received request * @throws InternalError */ protected < T extends ProtocolP2PPacket > void handleDiscover(T pd) throws InternalError { Payload p = pd.getPayload(); assert p instanceof DiscoverRequest : "payload must be an instance of DiscoverRequest"; if (!(p instanceof DiscoverRequest)) { sendInternalError(pd); } else { String filename = ((DiscoverRequest)p).getFilename(); try { synchronized (this) { while(writeLock.get()) { this.wait(); } readLock.incrementAndGet(); pd.sendResponse(createProtocolP2PPacket(new DiscoverResponse(filename, fileList.getOrDefault(filename, hostList)))); readLock.decrementAndGet(); this.notify(); } } catch (InterruptedException e) { throw new InternalError(); } catch (Exception e) { writeLog(e, LogLevel.Error); } } } /** Handle Ratio request * @param pd Received request * @throws InternalError */ protected < T extends ProtocolP2PPacket > void handleRatio(T pd) throws InternalError { Payload p = pd.getPayload(); assert p instanceof RatioRequest : "payload must be an instance of RatioRequest"; if (!(p instanceof RatioRequest)) { sendInternalError(pd); } else { HostItem host = ((RatioRequest)p).getHostItem(); writeLog("Ratio request for host " + host, LogLevel.Debug); try { synchronized (this) { while(writeLock.get()) { this.wait(); } readLock.incrementAndGet(); if (!hostList.contains(host)) { String l = ""; for (HostItem h: hostList) { l += h + " "; } writeLog(host + " is not in hostlist [ " + l + "]", LogLevel.Debug); pd.sendResponse(createProtocolP2PPacket(new Payload(RequestResponseCode.UNKNOWN_HOST))); } else { pd.sendResponse(createProtocolP2PPacket(new RatioResponse(host, ratioUp.get(host).longValue(), ratioDown.get(host).longValue()))); } readLock.decrementAndGet(); this.notify(); } } catch (InterruptedException e) { throw new InternalError(); } catch (Exception e) { writeLog(e, LogLevel.Error); } } } /** Handle List Responses * @param pd Received response * @throws InternalError */ protected > void handleListResponse(T pd, HostItem host) throws InternalError { Payload p = pd.getPayload(); assert p instanceof FileList: "payload must be an instance of FileList"; if (!(p instanceof FileList)) { throw new InternalError(); } else { String[] f = ((FileList)p).getFileList(); try { synchronized(this) { while(writeLock.getAndSet(true)) { this.wait(); } while(readLock.get() > 0) { this.wait(); } for (String file: f) { List h = fileList.get(file); if (h != null) { if (!h.contains(host)) { h.add(host); } } else { List emptyH = new ArrayList<>(); emptyH.add(host); fileList.put(file, emptyH); try { writeLog("Registering new file : " + file + ". Sending size request ", LogLevel.Debug); ProtocolP2PPacket pSReq = createProtocolP2PPacket(new SizeRequest(file)); pSReq.sendRequest(getHostItemSocket(host)); SizeResponse pSResp = (SizeResponse)pSReq.receiveResponse().getPayload(); if (!pSResp.getFilename().equals(file)) { writeLog("Wrong filename for size response of " + file, LogLevel.Debug); throw new InternalError(); } else { writeLog("Registering size of file " + file, LogLevel.Debug); fileSize.put(file, Long.valueOf(pSResp.getTotalSize())); } } catch (Exception e) { writeLog("Error while asking for size of " + file, LogLevel.Error); fileList.remove(file); } } } writeLock.getAndSet(false); this.notifyAll(); } } catch(InterruptedException e) { throw new InternalError(); } } } /** Handle Unregistering * @param pd Request received * @throws InternalError */ protected < T extends ProtocolP2PPacket > void handleUnregister(T pd) throws InternalError { Payload p = pd.getPayload(); assert p instanceof Unregister : "payload must be an instance of Unregister"; if (!(p instanceof Unregister)) { sendInternalError(pd); throw new InternalError(); } HostItem host = ((Unregister)p).getHostItem(); writeLog("Received UNREGISTER from host " + pd.getHostItem() + ". Removing host " + host, LogLevel.Action); try { synchronized (this) { while(writeLock.getAndSet(true)) { this.wait(); } while(readLock.get() > 0) { this.wait(); } hostList.remove(host); Map> fileListCpy = new HashMap<>(fileList); for(String f: fileListCpy.keySet()) { fileList.get(f).remove(host); if(fileList.get(f).isEmpty()) { fileList.remove(f); fileSize.remove(f); } } writeLock.getAndSet(false); this.notifyAll(); } } catch (InterruptedException e) { throw new InternalError(); } /* Note: we do not remove host from ratioUp and ratioDown since Unregistering is only here to indicate * the host is temporary not serving any files (ie. is down). */ } /** Getter for HostItem socket * @param hostItem HostItem */ protected abstract Object getHostItemSocket(HostItem hostItem); /** Close HostItem socket * @param hostItem HostItem */ protected abstract void closeHostItemSocket(HostItem hostItem); /** Handle Update Ratio * @param pd Received request * @throws InternalError */ protected < T extends ProtocolP2PPacket > void handleUpdateRatio(T pd) throws InternalError { Payload p = pd.getPayload(); assert p instanceof UpdateRatio : "payload must be an instance of UpdateRatio"; if (!(p instanceof UpdateRatio)) { throw new InternalError(); } UpdateRatio updateRatio = (UpdateRatio) p; HostItem updateRatioServer = updateRatio.getServer(); HostItem updateRatioClient = updateRatio.getClient(); long ratioSize = updateRatio.getDataSize(); writeLog("Ratio += " + ratioSize + ", client: " + updateRatioClient + " / server: " + updateRatioServer, LogLevel.Debug); try { synchronized(this) { while(writeLock.getAndSet(true)) { this.wait(); } while(readLock.get() > 0) { this.wait(); } if (!ratioDown.containsKey(updateRatioClient)) { writeLog("Unkwnow client host " + updateRatioClient, LogLevel.Debug); sendUnknownHost(pd); } else if (!ratioUp.containsKey(updateRatioServer)) { writeLog("Unkwnow server host " + updateRatioServer, LogLevel.Debug); sendUnknownHost(pd); } else { ratioDown.put(updateRatioClient, Long.valueOf(ratioDown.get(updateRatioClient).longValue() + ratioSize)); ratioUp.put(updateRatioServer, Long.valueOf(ratioUp.get(updateRatioServer).longValue() + ratioSize)); } writeLock.getAndSet(false); this.notifyAll(); } } catch (InterruptedException e) { throw new InternalError(); } } /** Handle Registering * @param pd Received request * @throws InternalError */ protected < T extends ProtocolP2PPacket > void handleRegister(T pd) throws InternalError { Payload p = pd.getPayload(); assert p instanceof Register : "payload must be an instance of Register"; if (!(p instanceof Register)) { throw new InternalError(); } // add host to known host list HostItem host = ((Register)p).getHostItem(); try { synchronized (this) { while(writeLock.getAndSet(true)) { this.wait(); } while(readLock.get() > 0) { this.wait(); } if (!hostList.contains(host)) { hostList.add(host); } // initialize ratios if this is a new host ratioUp.putIfAbsent(host, Long.valueOf(0)); ratioDown.putIfAbsent(host, Long.valueOf(0)); writeLock.getAndSet(false); this.notifyAll(); } } catch(InterruptedException e) { throw new InternalError(); } // send a list request try { ProtocolP2PPacket pLReq = createProtocolP2PPacket(new Payload(RequestResponseCode.LIST_REQUEST)); pLReq.sendRequest(getHostItemSocket(host)); writeLog("Received REGISTER from host " + pd.getHostItem() + ". Adding host " + host + " to list. Sending List request", LogLevel.Action); handleListResponse(pLReq.receiveResponse(), host); writeLog("Received LIST RESPONSE from host " + pd.getHostItem(), LogLevel.Action); closeHostItemSocket(host); } catch (EmptyDirectory e) { writeLog("Empty Directory", LogLevel.Debug); } catch (Exception e) { // remove from list because list request could not be send try { synchronized (this) { while(writeLock.getAndSet(true)) { this.wait(); } while(readLock.get() > 0) { this.wait(); } if (!hostList.contains(host)) { hostList.add(host); } hostList.remove(host); writeLock.getAndSet(false); this.notifyAll(); } } catch(InterruptedException e2) { throw new InternalError(); } writeLog("Aborting the add of host " + host, LogLevel.Action); writeLog(e, LogLevel.Error); } } /** Handle List request * @param pd Received request * @throws InternalError */ protected < T extends ProtocolP2PPacket > void handleListRequest(T pd) throws InternalError { try { String[] l; synchronized (this) { while(writeLock.get()) { this.wait(); } readLock.incrementAndGet(); l = fileList.keySet().toArray(new String[0]); readLock.decrementAndGet(); this.notify(); } if (l.length == 0) { writeLog("Sending EMPTY_DIRECTORY to " + pd.getHostItem(), LogLevel.Action); sendEmptyDirectory(pd); } else { writeLog("Sending FILELIST to " + pd.getHostItem(), LogLevel.Action); pd.sendResponse(createProtocolP2PPacket(new FileList(l))); } } catch (InterruptedException e) { throw new InternalError(); } catch (Exception e) { writeLog(e, LogLevel.Error); } } /** Handle Size request * @param pd Received request * @throws InternalError */ protected < T extends ProtocolP2PPacket > void handleSizeRequest(T pd) throws InternalError { Payload p = pd.getPayload(); assert p instanceof SizeRequest : "payload must be an instance of SizeRequest"; if (!(p instanceof SizeRequest)) { throw new InternalError(); } String f = ((SizeRequest)p).getFilename(); try { Long s; synchronized (this) { while(writeLock.get()) { this.wait(); } readLock.incrementAndGet(); s = fileSize.get(f); readLock.decrementAndGet(); this.notify(); } if (s == null) { writeLog("Sending NOT FOUND for file " + f + " to " + pd.getHostItem(), LogLevel.Action); sendNotFound(pd); } else if (s.equals(Long.valueOf(0))) { writeLog("Sending EMPTY FILE for file " + f + " to " + pd.getHostItem(), LogLevel.Action); sendEmptyFile(pd); } else { writeLog("Sending SIZE RESPONSE for file " + f + " to " + pd.getHostItem(), LogLevel.Action); pd.sendResponse(createProtocolP2PPacket(new SizeResponse(f, s.longValue()))); } } catch (InterruptedException e) { throw new InternalError(); } catch (Exception e) { writeLog(e, LogLevel.Error); } } /** Handle requests * @throws LocalException */ protected > void handleRequest(T pd) throws LocalException { Payload p = pd.getPayload(); switch (p.getRequestResponseCode()) { case LOAD_REQUEST: writeLog("Received LOAD_REQUEST from host " + pd.getHostItem() + ", sending NOT_FOUND", LogLevel.Action); sendNotFound(pd); break; case LIST_REQUEST: writeLog("Received LIST_REQUEST from host " + pd.getHostItem(), LogLevel.Action); handleListRequest(pd); break; case HASH_REQUEST: writeLog("Received HASH_REQUEST from host " + pd.getHostItem() + ", sending NOT_FOUND", LogLevel.Action); sendNotFound(pd); break; case REGISTER: writeLog("Received REGISTER from host " + pd.getHostItem(), LogLevel.Debug); handleRegister(pd); break; case UNREGISTER: writeLog("Received UNREGISTER from host " + pd.getHostItem(), LogLevel.Debug); handleUnregister(pd); break; case DISCOVER_REQUEST: writeLog("Received DISCOVER REQUEST from host " + pd.getHostItem(), LogLevel.Action); handleDiscover(pd); break; case RATIO_REQUEST: writeLog("Received RATIO REQUEST from host " + pd.getHostItem(), LogLevel.Action); handleRatio(pd); break; case UPDATE_RATIO: writeLog("Received UPDATE RATIO from host " + pd.getHostItem(), LogLevel.Action); handleUpdateRatio(pd); break; case SIZE_REQUEST: writeLog("Received SIZE REQUEST from host " + pd.getHostItem(), LogLevel.Action); handleSizeRequest(pd); break; default: writeLog("Received grabbage from host " + pd.getHostItem(), LogLevel.Action); sendInternalError(pd); break; } } /** Stop the thread */ public void setStop() { stop = true; } }