add RPC client side implementation

This commit is contained in:
Kelven Yang 2012-12-20 17:54:45 -08:00
parent c216990e1c
commit 1d5019c3d0
11 changed files with 401 additions and 11 deletions

View File

@ -0,0 +1,81 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.client;
import java.util.ArrayList;
import java.util.List;
import org.apache.cloudstack.framework.transport.TransportAddress;
import org.apache.cloudstack.framework.transport.TransportAttachResponsePdu;
import org.apache.cloudstack.framework.transport.TransportConnectResponsePdu;
import org.apache.cloudstack.framework.transport.TransportPdu;
public class ClientTransportConnection {
enum State {
Idle,
Connecting,
Open,
Closing
}
private ClientTransportProvider _provider;
// TODO, use state machine
private State _state = State.Idle;
private TransportAddress _connectionTpAddress;
private List<TransportPdu> _outputQueue = new ArrayList<TransportPdu>();
public ClientTransportConnection(ClientTransportProvider provider) {
_provider = provider;
}
public void connect(String serverAddress, int serverPort) {
boolean doConnect = false;
synchronized(this) {
if(_state == State.Idle) {
setState(State.Connecting);
doConnect = true;
}
}
if(doConnect) {
// ???
}
}
public void handleConnectResponsePdu(TransportConnectResponsePdu pdu) {
// TODO assume it is always succeeds
_connectionTpAddress = TransportAddress.fromAddressString(pdu.getDestAddress());
// ???
}
public void handleAttachResponsePdu(TransportAttachResponsePdu pdu) {
// ???
}
private void setState(State state) {
synchronized(this) {
if(_state != state) {
_state = state;
}
}
}
}

View File

@ -0,0 +1,47 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.client;
import org.apache.cloudstack.framework.transport.TransportEndpoint;
import org.apache.cloudstack.framework.transport.TransportEndpointSite;
import org.apache.cloudstack.framework.transport.TransportProvider;
public class ClientTransportEndpointSite extends TransportEndpointSite {
private String _predefinedAddress;
private int _providerKey;
public ClientTransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint, String predefinedAddress, int providerKey) {
super(provider, endpoint);
_predefinedAddress = predefinedAddress;
_providerKey = providerKey;
}
public String getPredefinedAddress() {
return _predefinedAddress;
}
public int getProviderKey() {
return _providerKey;
}
public void setProviderKey(int providerKey) {
_providerKey = providerKey;
}
}

View File

@ -18,19 +18,78 @@
*/
package org.apache.cloudstack.framework.client;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import org.apache.cloudstack.framework.serializer.MessageSerializer;
import org.apache.cloudstack.framework.transport.TransportEndpoint;
import org.apache.cloudstack.framework.transport.TransportEndpointSite;
import org.apache.cloudstack.framework.transport.TransportProvider;
import com.cloud.utils.concurrency.NamedThreadFactory;
public class ClientTransportProvider implements TransportProvider {
public static final int DEFAULT_WORKER_POOL_SIZE = 5;
private Map<Integer, ClientTransportEndpointSite> _endpointSites = new HashMap<Integer, ClientTransportEndpointSite>();
private Map<String, ClientTransportEndpointSite> _attachedMap = new HashMap<String, ClientTransportEndpointSite>();
private MessageSerializer _messageSerializer;
private ClientTransportConnection _connection;
private String _serverAddress;
private int _serverPort;
private int _poolSize = DEFAULT_WORKER_POOL_SIZE;
private ExecutorService _executor;
private int _nextProviderKey = 1;
public ClientTransportProvider() {
}
public ClientTransportProvider setPoolSize(int poolSize) {
_poolSize = poolSize;
return this;
}
public void initialize(String serverAddress, int serverPort) {
_serverAddress = serverAddress;
_serverPort = serverPort;
_executor = Executors.newFixedThreadPool(_poolSize, new NamedThreadFactory("Transport-Worker"));
_connection = new ClientTransportConnection(this);
_executor.execute(new Runnable() {
@Override
public void run() {
try {
_connection.connect(_serverAddress, _serverPort);
} catch(Throwable e) {
}
}
});
}
@Override
public TransportEndpointSite attach(TransportEndpoint endpoint, String predefinedAddress) {
// TODO Auto-generated method stub
return null;
ClientTransportEndpointSite endpointSite;
synchronized(this) {
endpointSite = getEndpointSite(endpoint);
if(endpointSite != null) {
// already attached
return endpointSite;
}
endpointSite = new ClientTransportEndpointSite(this, endpoint, predefinedAddress, getNextProviderKey());
_endpointSites.put(endpointSite.getProviderKey(), endpointSite);
}
return endpointSite;
}
@Override
@ -61,4 +120,21 @@ public class ClientTransportProvider implements TransportProvider {
String multiplexier, String message) {
// TODO
}
private ClientTransportEndpointSite getEndpointSite(TransportEndpoint endpoint) {
synchronized(this) {
for(ClientTransportEndpointSite endpointSite : _endpointSites.values()) {
if(endpointSite.getEndpoint() == endpoint)
return endpointSite;
}
}
return null;
}
public int getNextProviderKey() {
synchronized(this) {
return _nextProviderKey++;
}
}
}

View File

@ -167,7 +167,7 @@ public class RpcProviderImpl implements RpcProvider {
RpcServerCall call = new RpcServerCallImpl(this, sourceAddress, targetAddress, pdu);
// TODO, we are trying to avoid locking when calling into callbacks
// this can be optimized later
// this should be optimized later
List<RpcServiceEndpoint> endpoints = new ArrayList<RpcServiceEndpoint>();
synchronized(_serviceEndpoints) {
endpoints.addAll(_serviceEndpoints);

View File

@ -95,10 +95,10 @@ public class ServerTransportProvider implements TransportProvider {
String endpointId;
if(predefinedAddress != null && !predefinedAddress.isEmpty()) {
endpointId = predefinedAddress;
transportAddress = new TransportAddress(_nodeId, endpointId, 0);
transportAddress = new TransportAddress(_nodeId, TransportAddress.LOCAL_SERVICE_CONNECTION, endpointId, 0);
} else {
endpointId = String.valueOf(getNextEndpointId());
transportAddress = new TransportAddress(_nodeId, endpointId);
transportAddress = new TransportAddress(_nodeId, TransportAddress.LOCAL_SERVICE_CONNECTION, endpointId);
}
TransportEndpointSite endpointSite;

View File

@ -23,29 +23,33 @@ import java.util.Random;
public class TransportAddress {
public final static String LOCAL_SERVICE_NODE = "";
public final static int LOCAL_SERVICE_CONNECTION = 0;
private String _nodeId = LOCAL_SERVICE_NODE;
private int _connectionId = LOCAL_SERVICE_CONNECTION;
private String _endpointId;
private int _magic;
public TransportAddress(String nodeId, String endpointId) {
public TransportAddress(String nodeId, int connectionId, String endpointId) {
assert(nodeId != null);
assert(endpointId != null);
assert(nodeId.indexOf(".") < 0);
assert(endpointId.indexOf(".") < 0);
_nodeId = nodeId;
_connectionId = connectionId;
_endpointId = endpointId;
_magic = new Random().nextInt();
}
public TransportAddress(String nodeId, String endpointId, int magic) {
public TransportAddress(String nodeId, int connectionId, String endpointId, int magic) {
assert(nodeId != null);
assert(endpointId != null);
assert(nodeId.indexOf(".") < 0);
assert(endpointId.indexOf(".") < 0);
_nodeId = nodeId;
_connectionId = connectionId;
_endpointId = endpointId;
_magic = magic;
}
@ -59,6 +63,14 @@ public class TransportAddress {
return this;
}
public int getConnectionId() {
return _connectionId;
}
public void setConnectionId(int connectionId) {
_connectionId = connectionId;
}
public String getEndpointId() {
return _endpointId;
}
@ -73,20 +85,21 @@ public class TransportAddress {
return null;
String tokens[] = addressString.split("\\.");
if(tokens.length != 3)
if(tokens.length != 4)
return null;
return new TransportAddress(tokens[0], tokens[1], Integer.parseInt(tokens[2]));
return new TransportAddress(tokens[0], Integer.parseInt(tokens[1]), tokens[2], Integer.parseInt(tokens[3]));
}
public static TransportAddress getLocalPredefinedTransportAddress(String predefinedIdentifier) {
return new TransportAddress(LOCAL_SERVICE_NODE, predefinedIdentifier, 0);
return new TransportAddress(LOCAL_SERVICE_NODE, LOCAL_SERVICE_CONNECTION, predefinedIdentifier, 0);
}
@Override
public int hashCode() {
int hashCode = _magic;
hashCode = (hashCode << 3) ^ _nodeId.hashCode();
hashCode = (hashCode << 3) ^ _connectionId;
hashCode = (hashCode << 3) ^ _endpointId.hashCode();
return hashCode;
@ -103,7 +116,8 @@ public class TransportAddress {
if(this == other)
return true;
return _nodeId.equals(((TransportAddress)other)._nodeId) &&
return _nodeId.equals(((TransportAddress)other)._nodeId) &&
_connectionId == (((TransportAddress)other)._connectionId) &&
_endpointId.equals(((TransportAddress)other)._endpointId) &&
_magic == ((TransportAddress)other)._magic;
}
@ -114,6 +128,8 @@ public class TransportAddress {
if(_nodeId != null)
sb.append(_nodeId);
sb.append(".");
sb.append(_connectionId);
sb.append(".");
sb.append(_endpointId);
sb.append(".");
sb.append(_magic);

View File

@ -0,0 +1,34 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.transport;
public class TransportAttachRequestPdu extends TransportPdu {
private int _endpointProviderKey;
public TransportAttachRequestPdu() {
}
public int getEndpointProviderKey() {
return _endpointProviderKey;
}
public void setEndpointProviderKey(int endpointProviderKey) {
_endpointProviderKey = endpointProviderKey;
}
}

View File

@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.transport;
public class TransportAttachResponsePdu extends TransportPdu {
private int _statusCode;
private int _endpointProviderKey;
public TransportAttachResponsePdu() {
}
public int getStatusCode() {
return _statusCode;
}
public void setStatusCode(int statusCode) {
_statusCode = statusCode;
}
public int getEndpointProviderKey() {
return _endpointProviderKey;
}
public void setEndpointProviderKey(int endpointProviderKey) {
_endpointProviderKey = endpointProviderKey;
}
}

View File

@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.transport;
import org.apache.cloudstack.framework.serializer.OnwireName;
@OnwireName(name="TransportConnectRequestPdu")
public class TransportConnectRequestPdu extends TransportPdu {
String _authIdentity;
String _authCredential;
public TransportConnectRequestPdu() {
}
public String getAuthIdentity() {
return _authIdentity;
}
public void setAuthIdentity(String authIdentity) {
_authIdentity = authIdentity;
}
public String getAuthCredential() {
return _authCredential;
}
public void setAuthCredential(String authCredential) {
_authCredential = authCredential;
}
}

View File

@ -0,0 +1,37 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cloudstack.framework.transport;
import org.apache.cloudstack.framework.serializer.OnwireName;
@OnwireName(name="TransportConnectRequestPdu")
public class TransportConnectResponsePdu extends TransportPdu {
private int _statusCode;
public TransportConnectResponsePdu() {
}
public int getStatusCode() {
return _statusCode;
}
public void setStatusCode(int statusCode) {
_statusCode = statusCode;
}
}

View File

@ -44,6 +44,16 @@ public class TransportEndpointSite {
_outstandingSignalRequests = 0;
}
public TransportEndpointSite(TransportProvider provider, TransportEndpoint endpoint) {
assert(provider != null);
assert(endpoint != null);
_provider = provider;
_endpoint = endpoint;
_outstandingSignalRequests = 0;
}
public TransportEndpoint getEndpoint() {
return _endpoint;