-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Flow control proxy and tests #2067
Flow control proxy and tests #2067
Conversation
qLength = bandwidth * latency / 1000; // convert from ms to s | ||
chunkSize = Math.min(qLength, 1024); | ||
|
||
System.out.println("Started new proxy on port " + proxyPort); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
make this a logger statement
PTAL |
} | ||
} | ||
|
||
private class ClientReader implements Runnable { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was surprised to see separate implementations for client and server. It looks like only the queue to use is different? Maybe pass in serverQueue
/clientQueue
as a constructor parameter?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I see there is incrementClientQSize
as well. Maybe there should be an additional class that has state for a particular direction and the Runnables?
@carl-mastrangelo PTAL |
import java.util.concurrent.LinkedBlockingQueue; | ||
import java.util.logging.Logger; | ||
|
||
public class TrafficControlProxy { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please make this final
PTAL. Tried to make most of the changes from the earlier commit, but there are still issues with synchronization and exception handling. I held off on some of the exception handling/shut down issues because I think it will change as the threading model changes. So right now, this is still a one shot proxy that doesn't shut down cleanly. |
accidentally pushed FlowControlTest.java, please ignore |
private int chunkSize; | ||
private int bandwidth; | ||
private long latency; | ||
private final LinkedBlockingQueue<Message> serverQueue = new LinkedBlockingQueue<Message>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just use BlockingQueue as the assigned type.
@mfcripps Added some more comments. |
import java.lang.reflect.Field; | ||
import java.lang.reflect.InvocationTargetException; | ||
import java.lang.reflect.Method; | ||
// import java.lang.annotation.Native; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed all unused imports
#1 feature request: shared class used for both client and server :-D |
byte[] request = new byte[nextChunk]; | ||
int readableRequestBytes = serverIn.read(request); | ||
long sendTime = System.nanoTime() + latency; | ||
serverQueue.offer(new Message(sendTime, request, readableRequestBytes)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Offer returns a boolean saying if it worked. I think you either need to check this value, or use the blocking take
method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Swapped to put, which should always succeed on a delay queue because delay queues are unbounded.
PTAL |
} | ||
|
||
private void incrementQueue(Endpoint endpoint, int delta) { | ||
if (endpoint == Endpoint.SERVER) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of an enum, instead just put serverQueueSize, serverQueue, etc, in a class along with these methods and the Runnables.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Put all methods that interact with the DelayQueue in a new class MessageQueue, but I left the Reader and Writer runnables as their own classes that take a MessageQueue as an argument.
PTAL |
clientAcceptor.close(); | ||
clientSock.close(); | ||
serverSock.close(); | ||
for (Thread worker : workers) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do another loop over the workers and join() on them? It seems like reset()
requires all the threads to be exited by the time this method returns.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
makes sense, added a loop to call join
Some minor comments, but LGTM. That separate class makes it much nicer. |
Modified to work with integration tests - |
84d1ffe
to
a3ae654
Compare
no rate limiting or delay yet.