mirror of
				https://github.com/apache/cloudstack.git
				synced 2025-10-26 08:42:29 +01:00 
			
		
		
		
	
		
			
				
	
	
		
			402 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
			
		
		
	
	
			402 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Java
		
	
	
	
	
	
| // Licensed to the Apache Software Foundation (ASF) under one
 | |
| // or more contributor license agreements.  See the NOTICE file
 | |
| // distributed with this work for additional information
 | |
| // regarding copyright ownership.  The ASF licenses this file
 | |
| // to you under the Apache License, Version 2.0 (the
 | |
| // "License"); you may not use this file except in compliance
 | |
| // with the License.  You may obtain a copy of the License at
 | |
| //
 | |
| //   http://www.apache.org/licenses/LICENSE-2.0
 | |
| //
 | |
| // Unless required by applicable law or agreed to in writing,
 | |
| // software distributed under the License is distributed on an
 | |
| // "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 | |
| // KIND, either express or implied.  See the License for the
 | |
| // specific language governing permissions and limitations
 | |
| // under the License.
 | |
| package streamer;
 | |
| 
 | |
| /**
 | |
|  * Link to transfer data in bounds of single thread (synchronized transfer).
 | |
|  * Must not be used to send data to elements served in different threads.
 | |
|  */
 | |
| public class SyncLink implements Link {
 | |
| 
 | |
|     /**
 | |
|      * When null packet is pulled from source element, then make slight delay to
 | |
|      * avoid consuming of 100% of CPU in main loop in cases when link is pauses or
 | |
|      * source element cannot produce data right now.
 | |
|      */
 | |
|     protected static final long STANDARD_DELAY_FOR_EMPTY_PACKET = 10; // Milliseconds
 | |
| 
 | |
|     /**
 | |
|      * Delay for null packets in poll method when blocking is requested, in
 | |
|      * milliseconds.
 | |
|      */
 | |
|     protected long delay = STANDARD_DELAY_FOR_EMPTY_PACKET;
 | |
| 
 | |
|     /**
 | |
|      * Set to true to print debugging messages.
 | |
|      */
 | |
|     protected boolean verbose = System.getProperty("streamer.Link.debug", "false").equals("true");;
 | |
| 
 | |
|     /**
 | |
|      * ID of this link.
 | |
|      */
 | |
|     protected String id = null;
 | |
| 
 | |
|     /**
 | |
|      * Buffer with data to hold because link is paused, or data is pushed back.
 | |
|      */
 | |
|     protected ByteBuffer cacheBuffer = null;
 | |
| 
 | |
|     /**
 | |
|      * Size of expected packet. Data must be hold in link until full packet will
 | |
|      * be read.
 | |
|      */
 | |
|     protected int expectedPacketSize = 0;
 | |
| 
 | |
|     /**
 | |
|      * Number of packets and packet header transferred to element.
 | |
|      */
 | |
|     protected int packetNumber = 0;
 | |
| 
 | |
|     /**
 | |
|      * Set to true to hold all data in link until it will be set to false again.
 | |
|      */
 | |
|     protected boolean paused = false;
 | |
| 
 | |
|     /**
 | |
|      * Element to pull data from, when in pull mode.
 | |
|      */
 | |
|     protected Element source = null;
 | |
| 
 | |
|     /**
 | |
|      * Element to send data to in both pull and push modes.
 | |
|      */
 | |
|     protected Element sink = null;
 | |
| 
 | |
|     /**
 | |
|      * When in loop, indicates that loop must be stopped.
 | |
|      *
 | |
|      * @see run()
 | |
|      */
 | |
|     private boolean shutdown = false;
 | |
| 
 | |
|     /**
 | |
|      * Indicates that event STREAM_START is passed over this link, so main loop
 | |
|      * can be started to pull data from source element.
 | |
|      */
 | |
|     protected boolean start;
 | |
| 
 | |
|     /**
 | |
|      * Operate in pull mode.
 | |
|      */
 | |
|     protected boolean pullMode;
 | |
| 
 | |
|     public SyncLink() {
 | |
|     }
 | |
| 
 | |
|     public SyncLink(String id) {
 | |
|         this.id = id;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public void pushBack(ByteBuffer buf) {
 | |
|         if (verbose)
 | |
|             System.out.println("[" + this + "] INFO: Buffer pushed back: " + buf + ".");
 | |
| 
 | |
|         if (cacheBuffer != null) {
 | |
|             ByteBuffer tmp = cacheBuffer.join(buf);
 | |
|             cacheBuffer.unref();
 | |
|             cacheBuffer = tmp;
 | |
|         } else {
 | |
|             cacheBuffer = buf;
 | |
|             cacheBuffer.ref();
 | |
|         }
 | |
| 
 | |
|         resetCursor();
 | |
|     }
 | |
| 
 | |
|     private void resetCursor() {
 | |
|         // Reset cursor
 | |
|         cacheBuffer.cursor = 0;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public void pushBack(ByteBuffer buf, int lengthOfFullPacket) {
 | |
|         pushBack(buf);
 | |
|         expectedPacketSize = lengthOfFullPacket;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public String toString() {
 | |
|         return "SyncLink(" + ((id != null) ? id + ", " : "") + source + ":" + sink + ")";
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Push data to sink. Call with null to push cached data.
 | |
|      */
 | |
|     @Override
 | |
|     public void sendData(ByteBuffer buf) {
 | |
|         if (!paused && pullMode)
 | |
|             throw new RuntimeException("[" + this + "] ERROR: link is not in push mode.");
 | |
| 
 | |
|         if (verbose)
 | |
|             System.out.println("[" + this + "] INFO: Incoming buffer: " + buf + ".");
 | |
| 
 | |
|         if (buf == null && cacheBuffer == null)
 | |
|             return;
 | |
| 
 | |
|         if (cacheBuffer != null && buf != null) {
 | |
|             // Join old data with fresh data
 | |
|             buf = cacheBuffer.join(buf);
 | |
|             cacheBuffer.unref();
 | |
|             cacheBuffer = buf;
 | |
|         }
 | |
| 
 | |
|         // Store buffer in cache field to simplify following loop
 | |
|         if (buf != null)
 | |
|             cacheBuffer = buf;
 | |
| 
 | |
|         // When data pushed back and length of data is less than length of full
 | |
|         // packet, then feed data to sink element immediately
 | |
|         while (cacheBuffer != null) {
 | |
|             if (paused) {
 | |
|                 if (verbose)
 | |
|                     System.out.println("[" + this + "] INFO: Transfer is paused. Data in cache buffer: " + cacheBuffer + ".");
 | |
| 
 | |
|                 // Wait until rest of packet will be read
 | |
|                 return;
 | |
|             }
 | |
| 
 | |
|             if (expectedPacketSize > 0 && cacheBuffer.length < expectedPacketSize) {
 | |
|                 if (verbose)
 | |
|                     System.out.println("[" + this + "] INFO: Transfer is suspended because available data is less than expected packet size. Expected packet size: " +
 | |
|                         expectedPacketSize + ", data in cache buffer: " + cacheBuffer + ".");
 | |
| 
 | |
|                 // Wait until rest of packet will be read
 | |
|                 return;
 | |
|             }
 | |
| 
 | |
|             // Full packet or packet header is read, feed it to element
 | |
|             buf = cacheBuffer;
 | |
|             cacheBuffer = null;
 | |
|             expectedPacketSize = 0;
 | |
|             packetNumber++;
 | |
| 
 | |
|             if (sink == null)
 | |
|                 throw new NullPointerException("[" + this + "] ERROR: Cannot send data to sink: sink is null. Data: " + buf + ".");
 | |
| 
 | |
|             sink.handleData(buf, this);
 | |
|             // cacheBuffer and expectedPacketSize can be changed at this time
 | |
|         }
 | |
| 
 | |
|     }
 | |
| 
 | |
|     @SuppressWarnings("incomplete-switch")
 | |
|     @Override
 | |
|     public void sendEvent(Event event, Direction direction) {
 | |
| 
 | |
|         if (verbose)
 | |
|             System.out.println("[" + this + "] INFO: Event " + event + " is received.");
 | |
| 
 | |
|         // Shutdown main loop (if any) when STREAM_CLOSE event is received.
 | |
|         switch (event) {
 | |
|             case STREAM_START: {
 | |
|                 if (!start)
 | |
|                     start = true;
 | |
|                 else
 | |
|                     // Event already sent trough this link
 | |
|                     return;
 | |
|                 break;
 | |
|             }
 | |
|             case STREAM_CLOSE: {
 | |
|                 if (!shutdown)
 | |
|                     shutdown = true;
 | |
|                 else
 | |
|                     // Event already sent trough this link
 | |
|                     return;
 | |
|                 break;
 | |
|             }
 | |
|             case LINK_SWITCH_TO_PULL_MODE: {
 | |
|                 setPullMode();
 | |
|                 break;
 | |
|             }
 | |
| 
 | |
|         }
 | |
| 
 | |
|         switch (direction) {
 | |
|             case IN:
 | |
|                 source.handleEvent(event, direction);
 | |
|                 break;
 | |
|             case OUT:
 | |
|                 sink.handleEvent(event, direction);
 | |
|                 break;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public ByteBuffer pull(boolean block) {
 | |
|         if (!pullMode)
 | |
|             throw new RuntimeException("This link is not in pull mode.");
 | |
| 
 | |
|         if (paused) {
 | |
|             if (verbose)
 | |
|                 System.out.println("[" + this + "] INFO: Cannot pull, link is paused.");
 | |
| 
 | |
|             // Make slight delay in such case, to avoid consuming 100% of CPU
 | |
|             if (block) {
 | |
|                 try {
 | |
|                     Thread.sleep(100);
 | |
|                 } catch (InterruptedException e) {
 | |
|                 }
 | |
|             }
 | |
| 
 | |
|             return null;
 | |
|         }
 | |
| 
 | |
|         // If data in cache can be sent immediately,
 | |
|         // then return it instead of asking for more data from source
 | |
|         if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
 | |
|             if (verbose)
 | |
|                 System.out.println("[" + this + "] INFO: Data pulled from cache buffer: " + cacheBuffer + ".");
 | |
| 
 | |
|             ByteBuffer tmp = cacheBuffer;
 | |
|             cacheBuffer = null;
 | |
|             return tmp;
 | |
|         }
 | |
| 
 | |
|         // Pause this link, so incoming data will not be sent to sink
 | |
|         // immediately, then ask source element for more data
 | |
|         pause();
 | |
|         source.poll(block);
 | |
|         resume();
 | |
| 
 | |
|         // Can return something only when data was stored in buffer
 | |
|         if (cacheBuffer != null && (expectedPacketSize == 0 || (expectedPacketSize > 0 && cacheBuffer.length >= expectedPacketSize))) {
 | |
|             if (verbose)
 | |
|                 System.out.println("[" + this + "] INFO: Data pulled from source: " + cacheBuffer + ".");
 | |
| 
 | |
|             ByteBuffer tmp = cacheBuffer;
 | |
|             cacheBuffer = null;
 | |
|             return tmp;
 | |
|         } else {
 | |
|             return null;
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public Element setSink(Element sink) {
 | |
|         if (sink != null && this.sink != null)
 | |
|             throw new RuntimeException("This link sink element is already set. Link: " + this + ", new sink: " + sink + ", existing sink: " + this.sink + ".");
 | |
| 
 | |
|         if (sink == null && cacheBuffer != null)
 | |
|             throw new RuntimeException("Cannot drop link: cache is not empty. Link: " + this + ", cache: " + cacheBuffer);
 | |
| 
 | |
|         this.sink = sink;
 | |
| 
 | |
|         return sink;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public Element setSource(Element source) {
 | |
|         if (this.source != null && source != null)
 | |
|             throw new RuntimeException("This link source element is already set. Link: " + this + ", new source: " + source + ", existing source: " + this.source + ".");
 | |
| 
 | |
|         this.source = source;
 | |
|         return source;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public Element getSource() {
 | |
|         return source;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public Element getSink() {
 | |
|         return sink;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public void pause() {
 | |
|         if (paused)
 | |
|             throw new RuntimeException("Link is already paused.");
 | |
| 
 | |
|         paused = true;
 | |
| 
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public void resume() {
 | |
|         paused = false;
 | |
|     }
 | |
| 
 | |
|     /**
 | |
|      * Run pull loop to actively pull data from source and push it to sink. It
 | |
|      * must be only one pull loop per thread.
 | |
|      *
 | |
|      * Pull loop will start after event STREAM_START. This link and source element
 | |
|      * incomming links will be switched to pull mode before pull loop will be
 | |
|      * started using event LINK_SWITCH_TO_PULL_MODE.
 | |
|      */
 | |
|     @Override
 | |
|     public void run() {
 | |
|         // Wait until even STREAM_START will arrive
 | |
|         while (!start) {
 | |
|             delay();
 | |
|         }
 | |
| 
 | |
|         sendEvent(Event.LINK_SWITCH_TO_PULL_MODE, Direction.IN);
 | |
| 
 | |
|         if (verbose)
 | |
|             System.out.println("[" + this + "] INFO: Starting pull loop.");
 | |
| 
 | |
|         // Pull source in loop
 | |
|         while (!shutdown) {
 | |
|             // Pull data from source element and send it to sink element
 | |
|             ByteBuffer data = pull(true);
 | |
|             if (data != null)
 | |
|                 sink.handleData(data, this);
 | |
| 
 | |
|             if (!shutdown && data == null) {
 | |
|                 // Make slight delay to avoid consuming of 100% of CPU
 | |
|                 delay();
 | |
|             }
 | |
|         }
 | |
| 
 | |
|         if (verbose)
 | |
|             System.out.println("[" + this + "] INFO: Pull loop finished.");
 | |
| 
 | |
|     }
 | |
| 
 | |
|     protected void delay() {
 | |
|         try {
 | |
|             Thread.sleep(delay);
 | |
|         } catch (InterruptedException e) {
 | |
|             e.printStackTrace(System.err);
 | |
|             throw new RuntimeException("Interrupted in main loop.", e);
 | |
|         }
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public void setPullMode() {
 | |
|         if (verbose)
 | |
|             System.out.println("[" + this + "] INFO: Switching to PULL mode.");
 | |
| 
 | |
|         this.pullMode = true;
 | |
|     }
 | |
| 
 | |
|     @Override
 | |
|     public void drop() {
 | |
|         if (pullMode)
 | |
|             throw new RuntimeException("Cannot drop link in pull mode.");
 | |
| 
 | |
|         if (cacheBuffer != null)
 | |
|             throw new RuntimeException("Cannot drop link when cache conatains data: " + cacheBuffer + ".");
 | |
| 
 | |
|         source.dropLink(this);
 | |
|         sink.dropLink(this);
 | |
|     }
 | |
| }
 |