COM-29: Attempt to fix the keysync integration as discussed with VB.
authorMarkus Schaber <markus@pep-security.net>
Sun, 23 Oct 2016 23:27:46 +0200
changeset 188cf4a4e973ab2
parent 187 4ba92936a9d9
child 189 5361d677afdb
COM-29: Attempt to fix the keysync integration as discussed with VB.
CpEpEngine.cpp
CpEpEngine.h
pEpCOMServerAdapter.idl
     1.1 --- a/CpEpEngine.cpp	Thu Oct 20 17:53:00 2016 +0200
     1.2 +++ b/CpEpEngine.cpp	Sun Oct 23 23:27:46 2016 +0200
     1.3 @@ -730,18 +730,8 @@
     1.4  	// acquire the lock
     1.5  	std::unique_lock<std::mutex> lock(keysync_mutex);
     1.6  
     1.7 -	// Check if we're already running.
     1.8 -	if (keysync_thread_running) 
     1.9 -	{
    1.10 -		// If we have pending aborts, we need to wake up those threads
    1.11 -		// and cancel the pending abort.
    1.12 -		if (keysync_abort_requested) 
    1.13 -		{
    1.14 -			keysync_abort_requested = false;
    1.15 -			keysync_condition.notify_all();
    1.16 -		}
    1.17 -		return;
    1.18 -	}
    1.19 +	// Assert if we're not already running.
    1.20 +    assert(!this->keysync_thread);
    1.21  
    1.22  	// Ensure we are not aborting the new thread due to a
    1.23  	// left over flag.
    1.24 @@ -754,11 +744,39 @@
    1.25  
    1.26      attach_sync_session(get_session(), keysync_session);
    1.27  
    1.28 +    // We need to marshal the callbacks to the keysync thread
    1.29 +    LPSTREAM marshaled_callbacks;
    1.30 +
    1.31 +    auto result = CoMarshalInterThreadInterfaceInStream(IID_IpEpEngineCallbacks, client_callbacks, &marshaled_callbacks);
    1.32 +    assert(result == S_OK);
    1.33 +
    1.34  	// Star the keysync thread
    1.35 -	keysync_thread = new thread(::do_sync_protocol, keysync_session, this);
    1.36 +	keysync_thread = new thread(do_keysync_in_thread, this, marshaled_callbacks);
    1.37 +}
    1.38  
    1.39 -	// flag to signal we're running
    1.40 -	keysync_thread_running = true;
    1.41 +void CpEpEngine::do_keysync_in_thread(CpEpEngine* self, LPSTREAM marshaled_callbacks) 
    1.42 +{
    1.43 +    assert(self);
    1.44 +    // We need to initialize COM here for successfull delivery of the callbacks.
    1.45 +    // As we don't create any COM instances in our thread, the COMINIT value is
    1.46 +    // currently irrelevant, so we go with the safest value.
    1.47 +    auto res = CoInitializeEx(NULL, COINIT_APARTMENTTHREADED);
    1.48 +    assert(res == S_OK);
    1.49 +
    1.50 +    LPVOID vp;
    1.51 +
    1.52 +    res = CoGetInterfaceAndReleaseStream(marshaled_callbacks, IID_IpEpEngineCallbacks, &vp);
    1.53 +    assert(SUCCEEDED(res));
    1.54 +
    1.55 +    self->client_callbacks_on_sync_thread = static_cast<IpEpEngineCallbacks*>(vp);
    1.56 +
    1.57 +    ::do_sync_protocol(self->keysync_session, self);
    1.58 +
    1.59 +    self->client_callbacks_on_sync_thread->Release();
    1.60 +
    1.61 +    self->client_callbacks_on_sync_thread = NULL;
    1.62 +
    1.63 +    CoUninitialize();
    1.64  }
    1.65  
    1.66  void CpEpEngine::stop_keysync()
    1.67 @@ -766,10 +784,11 @@
    1.68  	// acquire the lock
    1.69  	std::unique_lock<std::mutex> lock(keysync_mutex);
    1.70  
    1.71 -	// check whether we're not running, or there's a concurrent abort
    1.72 -	if (keysync_abort_requested || !keysync_thread_running)
    1.73 -		return;
    1.74 +    // Do nothing if keysync is not running.
    1.75 +    if (!keysync_thread)
    1.76 +        return;
    1.77  
    1.78 +    assert(!keysync_abort_requested);
    1.79  	// signal that we're gonna abort
    1.80  	keysync_abort_requested = true;
    1.81  
    1.82 @@ -777,24 +796,20 @@
    1.83  	keysync_condition.notify_all();
    1.84  
    1.85  	// Wait for the other thread to finish and clean up
    1.86 -	while (keysync_thread_running && keysync_abort_requested)
    1.87 +	while (keysync_abort_requested)
    1.88  		keysync_condition.wait(lock);
    1.89  
    1.90 -	if (!keysync_abort_requested)
    1.91 -		return; // someone called start_keysync() while we were trying to stop it...
    1.92 -
    1.93 -    detach_sync_session(get_session());
    1.94 -
    1.95 -	// wait for the thread to end
    1.96 +	// collect the child thread for the thread to end
    1.97  	keysync_thread->join();
    1.98  
    1.99  	// clean up
   1.100  	delete keysync_thread;
   1.101  	keysync_thread = NULL;
   1.102 +
   1.103 +    ::detach_sync_session(get_session());
   1.104  	::unregister_sync_callbacks(keysync_session);
   1.105  	release(keysync_session);
   1.106      keysync_session = NULL;
   1.107 -	keysync_abort_requested = false;
   1.108  }
   1.109  
   1.110  int CpEpEngine::inject_sync_msg(void * msg, void * management)
   1.111 @@ -810,8 +825,8 @@
   1.112  	// acquire the lock
   1.113  	std::unique_lock<std::mutex> lock(me->keysync_mutex);
   1.114  
   1.115 -	// check whether we're running:
   1.116 -	if (me->keysync_abort_requested || !me->keysync_thread_running)
   1.117 +	// check whether we're in a valid state running:
   1.118 +	if (!me->keysync_thread)
   1.119  		return E_ASYNC_OPERATION_NOT_STARTED;
   1.120  
   1.121  	// queue the message
   1.122 @@ -854,7 +869,7 @@
   1.123  	}
   1.124  
   1.125  	// we acknowledge that we're quitting...
   1.126 -	me->keysync_thread_running = false;
   1.127 +	me->keysync_abort_requested = false;
   1.128  
   1.129  	// We signal the main thread that we got his signal
   1.130  	// so it can gain the mutex again and call join() on us.
   1.131 @@ -869,32 +884,36 @@
   1.132  
   1.133  STDMETHODIMP CpEpEngine::RegisterCallbacks(IpEpEngineCallbacks* new_callbacks)
   1.134  {
   1.135 -	callbacks cbs = get_callbacks();
   1.136 -	vector<IpEpEngineCallbacks*>& vec = cbs;
   1.137 +    // check for valid parameter
   1.138 +    if (!new_callbacks)
   1.139 +        return E_INVALIDARG;
   1.140  
   1.141 -	vec.push_back(new_callbacks);
   1.142 -	new_callbacks->AddRef();
   1.143 +    // don't allow double registration.
   1.144 +    if (this->client_callbacks)
   1.145 +        return E_ILLEGAL_STATE_CHANGE;
   1.146 +
   1.147 +    this->client_callbacks = new_callbacks;
   1.148 +    new_callbacks->AddRef();
   1.149 +
   1.150  	start_keysync();
   1.151  
   1.152  	return S_OK;
   1.153  }
   1.154  
   1.155 -STDMETHODIMP CpEpEngine::UnregisterCallbacks(IpEpEngineCallbacks* obsolete_callbacks)
   1.156 +STDMETHODIMP CpEpEngine::UnregisterCallbacks()
   1.157  {
   1.158 -	callbacks cbs = get_callbacks();
   1.159 -	vector<IpEpEngineCallbacks*>& vec = cbs;
   1.160 +    // don't allow double deregistration.
   1.161 +    // S_FALSE still is no error (as double deregistration is not fatal).
   1.162 +    if (!this->client_callbacks)
   1.163 +        return S_FALSE;
   1.164  
   1.165 -	auto position = std::find(vec.begin(), vec.end(), obsolete_callbacks);
   1.166 -	if (position != vec.end()) {
   1.167 -		vec.erase(position);
   1.168 -		obsolete_callbacks->Release();
   1.169 -		if (vec.empty())
   1.170 -			stop_keysync();
   1.171 +    stop_keysync();
   1.172  
   1.173 -		return S_OK;
   1.174 -	}
   1.175 +    this->client_callbacks->Release();
   1.176  
   1.177 -	return S_FALSE;
   1.178 +    this->client_callbacks = NULL;
   1.179 +
   1.180 +    return S_OK;
   1.181  }
   1.182  
   1.183  STDMETHODIMP CpEpEngine::OpenPGPListKeyinfo(BSTR search_pattern, LPSAFEARRAY* keyinfo_list) {
   1.184 @@ -931,34 +950,22 @@
   1.185  
   1.186  HRESULT CpEpEngine::Fire_MessageToSend(TextMessage * msg)
   1.187  {
   1.188 -	callbacks cbs = get_callbacks();
   1.189 -	vector<IpEpEngineCallbacks*>& vec = cbs;
   1.190 +	assert(msg);
   1.191 +    assert(this->client_callbacks_on_sync_thread);
   1.192  
   1.193 -	assert(msg);
   1.194 +    auto result = this->client_callbacks_on_sync_thread->MessageToSend(msg);
   1.195  
   1.196 -	for (auto it = vec.begin(); it != vec.end(); ++it)
   1.197 -	{
   1.198 -		auto res = (*it)->MessageToSend(msg);
   1.199 -		if (res != S_OK)
   1.200 -			return res;
   1.201 -	}
   1.202 -	return S_OK;
   1.203 +	return result;
   1.204  }
   1.205  
   1.206  HRESULT CpEpEngine::Fire_ShowHandshake(pEpIdentity * self, pEpIdentity * partner, SyncHandshakeResult * result)
   1.207  {
   1.208 -	callbacks cbs = get_callbacks();
   1.209 -	vector<IpEpEngineCallbacks*>& vec = cbs;
   1.210 -
   1.211  	assert(self);
   1.212  	assert(partner);
   1.213  	assert(result);
   1.214 -
   1.215 -	for (auto it = vec.begin(); it != vec.end(); ++it)
   1.216 -	{
   1.217 -		auto res = (*it)->ShowHandshake(self, partner, result);
   1.218 -		if (res != S_OK)
   1.219 -			return res;
   1.220 -	}
   1.221 -	return S_OK;
   1.222 +    assert(this->client_callbacks_on_sync_thread);
   1.223 +    	
   1.224 +	auto res = this->client_callbacks_on_sync_thread->ShowHandshake(self, partner, result);
   1.225 +		
   1.226 +	return res;	
   1.227  }
     2.1 --- a/CpEpEngine.h	Thu Oct 20 17:53:00 2016 +0200
     2.2 +++ b/CpEpEngine.h	Sun Oct 23 23:27:46 2016 +0200
     2.3 @@ -100,34 +100,6 @@
     2.4          return session(this);
     2.5      }
     2.6  
     2.7 -	class callbacks
     2.8 -	{
     2.9 -	private:
    2.10 -		CpEpEngine *me;
    2.11 -
    2.12 -	public:
    2.13 -		callbacks(CpEpEngine *myself)
    2.14 -		{
    2.15 -			me = myself;
    2.16 -			me->callback_mutex.lock();
    2.17 -		}
    2.18 -
    2.19 -		~callbacks()
    2.20 -		{
    2.21 -			me->callback_mutex.unlock();
    2.22 -		}
    2.23 -
    2.24 -		operator vector<IpEpEngineCallbacks *>& ()
    2.25 -		{
    2.26 -			return me->callback_vector;
    2.27 -		}
    2.28 -	};
    2.29 -
    2.30 -	callbacks get_callbacks()
    2.31 -	{
    2.32 -		return callbacks(this);
    2.33 -	}
    2.34 -
    2.35      typedef locked_queue<pEp_identity_cpp> identity_queue_t;
    2.36      static ::pEp_identity * retrieve_next_identity(void *management);
    2.37      static PEP_STATUS messageToSend(void * obj, message *msg);
    2.38 @@ -152,20 +124,20 @@
    2.39      thread *keymanagement_thread;
    2.40      bool verbose_mode;
    2.41  
    2.42 -	mutex callback_mutex;
    2.43 -	vector<IpEpEngineCallbacks*> callback_vector;
    2.44 +	IpEpEngineCallbacks* client_callbacks = NULL;
    2.45 +    IpEpEngineCallbacks* client_callbacks_on_sync_thread = NULL;
    2.46  
    2.47  	// Keysync members
    2.48      static int inject_sync_msg(void *msg, void* management);
    2.49      static void* retrieve_next_sync_msg(void* management);
    2.50      void start_keysync();
    2.51 +    static void do_keysync_in_thread(CpEpEngine* self, LPSTREAM marshaled_callbacks);
    2.52      void stop_keysync();
    2.53  
    2.54      std::mutex keysync_mutex;
    2.55      std::condition_variable keysync_condition;
    2.56      std::thread *keysync_thread = NULL;
    2.57      std::queue<void*> keysync_queue;
    2.58 -    bool keysync_thread_running = false;
    2.59      bool keysync_abort_requested = false;
    2.60      PEP_SESSION keysync_session;
    2.61  
    2.62 @@ -218,7 +190,7 @@
    2.63  	// Event callbacks
    2.64  
    2.65  	STDMETHOD(RegisterCallbacks)(IpEpEngineCallbacks *new_callback);
    2.66 -	STDMETHOD(UnregisterCallbacks)(IpEpEngineCallbacks *obsolete_callback);
    2.67 +	STDMETHOD(UnregisterCallbacks)();
    2.68  
    2.69      // PGP compatibility functions
    2.70      STDMETHOD(OpenPGPListKeyinfo)(BSTR search_pattern, LPSAFEARRAY* keyinfo_list);
     3.1 --- a/pEpCOMServerAdapter.idl	Thu Oct 20 17:53:00 2016 +0200
     3.2 +++ b/pEpCOMServerAdapter.idl	Sun Oct 23 23:27:46 2016 +0200
     3.3 @@ -227,7 +227,7 @@
     3.4  
     3.5      // callback / keysync API
     3.6      HRESULT RegisterCallbacks([in] IpEpEngineCallbacks* newCallback);
     3.7 -    HRESULT UnregisterCallbacks([in] IpEpEngineCallbacks* obsoleteCallback);
     3.8 +    HRESULT UnregisterCallbacks();
     3.9  };
    3.10  
    3.11  [
    3.12 @@ -243,5 +243,6 @@
    3.13      ]
    3.14      coclass pEpEngine {
    3.15          [default] interface IpEpEngine;
    3.16 +        interface IpEpEngineCallbacks;
    3.17      };
    3.18  };