* i2psnark: Refactor request tracking to prevent possible deadlocks

This commit is contained in:
zzz
2011-01-05 20:21:29 +00:00
parent 6822bf7978
commit e826ce723a
4 changed files with 49 additions and 50 deletions

View File

@ -431,6 +431,7 @@ public class Peer implements Comparable
/**
* Are we currently requesting the piece?
* @deprecated deadlocks
* @since 0.8.1
*/
boolean isRequesting(int p) {

View File

@ -591,9 +591,6 @@ public class PeerCoordinator implements PeerListener
}
if (piece == null)
wantedSize = wantedPieces.size();
} // synch
// Don't sync the following, deadlock from calling each Peer's isRequesting()
//Only request a piece we've requested before if there's no other choice.
if (piece == null) {
@ -612,22 +609,12 @@ public class PeerCoordinator implements PeerListener
Piece p = it2.next();
if (havePieces.get(p.getId())) {
// limit number of parallel requests
int requestedCount = 0;
for (Peer pr : peers) {
// deadlock if synced on wantedPieces
if (pr.isRequesting(p.getId())) {
if (pr.equals(peer)) {
// don't give it to him again
requestedCount = MAX_PARALLEL_REQUESTS;
break;
}
if (++requestedCount >= MAX_PARALLEL_REQUESTS)
break;
}
}
if (requestedCount >= MAX_PARALLEL_REQUESTS)
continue;
int requestedCount = p.getRequestCount();
if (requestedCount < MAX_PARALLEL_REQUESTS &&
!p.isRequestedBy(peer)) {
piece = p;
break;
}
}
}
if (piece == null) {
@ -648,9 +635,10 @@ public class PeerCoordinator implements PeerListener
if (record) {
if (_log.shouldLog(Log.INFO))
_log.info(peer + " is now requesting: piece " + piece + " priority " + piece.getPriority());
piece.setRequested(true);
piece.setRequested(peer, true);
}
return piece.getId();
} // synch
}
/**
@ -948,7 +936,7 @@ public class PeerCoordinator implements PeerListener
}
} // else drop the empty partial piece
// synchs on wantedPieces...
markUnrequestedIfOnlyOne(peer, pp.getPiece());
markUnrequested(peer, pp.getPiece());
}
if (_log.shouldLog(Log.INFO))
_log.info("Partial list size now: " + partialPieces.size());
@ -974,7 +962,7 @@ public class PeerCoordinator implements PeerListener
// this is just a double-check, it should be in there
for(Piece piece : wantedPieces) {
if (piece.getId() == savedPiece) {
piece.setRequested(true);
piece.setRequested(peer, true);
if (_log.shouldLog(Log.INFO)) {
_log.info("Restoring orphaned partial piece " + pp +
" Partial list size now: " + partialPieces.size());
@ -1053,33 +1041,16 @@ public class PeerCoordinator implements PeerListener
}
}
/** Clear the requested flag for a piece if the peer
** is the only one requesting it
/**
* Clear the requested flag for a piece
*/
private void markUnrequestedIfOnlyOne(Peer peer, int piece)
private void markUnrequested(Peer peer, int piece)
{
// see if anybody else is requesting
for (Peer p : peers) {
if (p.equals(peer))
continue;
if (p.state == null)
continue;
// FIXME don't go into the state
if (p.state.getRequestedPieces().contains(Integer.valueOf(piece))) {
if (_log.shouldLog(Log.DEBUG))
_log.debug("Another peer is requesting piece " + piece);
return;
}
}
// nobody is, so mark unrequested
synchronized(wantedPieces)
{
for (Piece pc : wantedPieces) {
if (pc.getId() == piece) {
pc.setRequested(false);
if (_log.shouldLog(Log.DEBUG))
_log.debug("Removing from request list piece " + piece);
pc.setRequested(peer, false);
return;
}
}

View File

@ -464,7 +464,7 @@ class PeerState implements DataLoader
/**
* @return all pieces we are currently requesting, or empty Set
*/
synchronized Set<Integer> getRequestedPieces() {
synchronized private Set<Integer> getRequestedPieces() {
Set<Integer> rv = new HashSet(outstandingRequests.size() + 1);
for (Request req : outstandingRequests) {
rv.add(Integer.valueOf(req.piece));
@ -564,6 +564,7 @@ class PeerState implements DataLoader
/**
* Are we currently requesting the piece?
* @deprecated deadlocks
* @since 0.8.1
*/
synchronized boolean isRequesting(int piece) {

View File

@ -12,13 +12,15 @@ class Piece implements Comparable {
private int id;
private Set<PeerID> peers;
private boolean requested;
/** @since 0.8.3 */
private Set<PeerID> requests;
/** @since 0.8.1 */
private int priority;
public Piece(int id) {
this.id = id;
this.peers = new ConcurrentHashSet();
this.peers = new ConcurrentHashSet(I2PSnarkUtil.MAX_CONNECTIONS);
this.requests = new ConcurrentHashSet(2);
}
/**
@ -49,12 +51,36 @@ class Piece implements Comparable {
}
public int getId() { return this.id; }
/** @deprecated unused */
public Set<PeerID> getPeers() { return this.peers; }
public boolean addPeer(Peer peer) { return this.peers.add(peer.getPeerID()); }
public boolean removePeer(Peer peer) { return this.peers.remove(peer.getPeerID()); }
public boolean isRequested() { return this.requested; }
public void setRequested(boolean requested) { this.requested = requested; }
public boolean isRequested() { return !this.requests.isEmpty(); }
/**
* Since 0.8.3, keep track of who is requesting here,
* to avoid deadlocks from querying each peer.
*/
public void setRequested(Peer peer, boolean requested) {
if (requested)
this.requests.add(peer.getPeerID());
else
this.requests.remove(peer.getPeerID());
}
/**
* Is peer requesting this piece?
* @since 0.8.3
*/
public boolean isRequestedBy(Peer peer) {
return this.requests.contains(peer.getPeerID());
}
/**
* How many peers are requesting this piece?
* @since 0.8.3
*/
public int getRequestCount() {
return this.requests.size();
}
/** @return default 0 @since 0.8.1 */
public int getPriority() { return this.priority; }