aboutsummaryrefslogtreecommitdiff
path: root/crypto/krb5/src/tests/kcmserver.py
diff options
context:
space:
mode:
Diffstat (limited to 'crypto/krb5/src/tests/kcmserver.py')
-rw-r--r--crypto/krb5/src/tests/kcmserver.py336
1 files changed, 0 insertions, 336 deletions
diff --git a/crypto/krb5/src/tests/kcmserver.py b/crypto/krb5/src/tests/kcmserver.py
deleted file mode 100644
index f6dfcb711b3a..000000000000
--- a/crypto/krb5/src/tests/kcmserver.py
+++ /dev/null
@@ -1,336 +0,0 @@
-# This is a simple KCM test server, used to exercise the KCM ccache
-# client code. It will generally throw an uncaught exception if the
-# client sends anything unexpected, so is unsuitable for production.
-# (It also imposes no namespace or access constraints, and blocks
-# while reading requests and writing responses.)
-
-# This code knows nothing about how to marshal and unmarshal principal
-# names and credentials as is required in the KCM protocol; instead,
-# it just remembers the marshalled forms and replays them to the
-# client when asked. This works because marshalled creds and
-# principal names are always the last part of marshalled request
-# arguments, and because we don't need to implement remove_cred (which
-# would need to know how to match a cred tag against previously stored
-# credentials).
-
-# The following code is useful for debugging if anything appears to be
-# going wrong in the server, since daemon output is generally not
-# visible in Python test scripts.
-#
-# import sys, traceback
-# def ehook(etype, value, tb):
-# with open('/tmp/exception', 'w') as f:
-# traceback.print_exception(etype, value, tb, file=f)
-# sys.excepthook = ehook
-
-import optparse
-import select
-import socket
-import struct
-import sys
-
-caches = {}
-cache_uuidmap = {}
-defname = b'default'
-next_unique = 1
-next_uuid = 1
-
-class KCMOpcodes(object):
- GEN_NEW = 3
- INITIALIZE = 4
- DESTROY = 5
- STORE = 6
- RETRIEVE = 7
- GET_PRINCIPAL = 8
- GET_CRED_UUID_LIST = 9
- GET_CRED_BY_UUID = 10
- REMOVE_CRED = 11
- GET_CACHE_UUID_LIST = 18
- GET_CACHE_BY_UUID = 19
- GET_DEFAULT_CACHE = 20
- SET_DEFAULT_CACHE = 21
- GET_KDC_OFFSET = 22
- SET_KDC_OFFSET = 23
- GET_CRED_LIST = 13001
- REPLACE = 13002
-
-
-class KRB5Errors(object):
- KRB5_CC_NOTFOUND = -1765328243
- KRB5_CC_END = -1765328242
- KRB5_CC_NOSUPP = -1765328137
- KRB5_FCC_NOFILE = -1765328189
- KRB5_FCC_INTERNAL = -1765328188
-
-
-def make_uuid():
- global next_uuid
- uuid = bytes(12) + struct.pack('>L', next_uuid)
- next_uuid = next_uuid + 1
- return uuid
-
-
-class Cache(object):
- def __init__(self, name):
- self.name = name
- self.princ = None
- self.uuid = make_uuid()
- self.cred_uuids = []
- self.creds = {}
- self.time_offset = 0
-
-
-def get_cache(name):
- if name in caches:
- return caches[name]
- cache = Cache(name)
- caches[name] = cache
- cache_uuidmap[cache.uuid] = cache
- return cache
-
-
-def unmarshal_name(argbytes):
- offset = argbytes.find(b'\0')
- return argbytes[0:offset], argbytes[offset+1:]
-
-
-# Find the bounds of a marshalled principal, returning it and the
-# remainder of argbytes.
-def extract_princ(argbytes):
- ncomps, rlen = struct.unpack('>LL', argbytes[4:12])
- pos = 12 + rlen
- for i in range(ncomps):
- clen, = struct.unpack('>L', argbytes[pos:pos+4])
- pos += 4 + clen
- return argbytes[0:pos], argbytes[pos:]
-
-
-# Return true if the marshalled principals p1 and p2 name the same
-# principal.
-def princ_eq(p1, p2):
- # Ignore the name-types at bytes 0..3. The remaining bytes should
- # be identical if the principals are the same.
- return p1[4:] == p2[4:]
-
-
-def op_gen_new(argbytes):
- # Does not actually check for uniqueness.
- global next_unique
- name = b'unique' + str(next_unique).encode('ascii')
- next_unique += 1
- return 0, name + b'\0'
-
-
-def op_initialize(argbytes):
- name, princ = unmarshal_name(argbytes)
- cache = get_cache(name)
- cache.princ = princ
- cache.cred_uuids = []
- cache.creds = {}
- cache.time_offset = 0
- return 0, b''
-
-
-def op_destroy(argbytes):
- name, rest = unmarshal_name(argbytes)
- cache = get_cache(name)
- del cache_uuidmap[cache.uuid]
- del caches[name]
- return 0, b''
-
-
-def op_store(argbytes):
- name, cred = unmarshal_name(argbytes)
- cache = get_cache(name)
- uuid = make_uuid()
- cache.creds[uuid] = cred
- cache.cred_uuids.append(uuid)
- return 0, b''
-
-
-def op_retrieve(argbytes):
- name, rest = unmarshal_name(argbytes)
- # Ignore the flags at rest[0:4] and the header at rest[4:8].
- # Assume there are client and server creds in the tag and match
- # only against them.
- cprinc, rest = extract_princ(rest[8:])
- sprinc, rest = extract_princ(rest)
- cache = get_cache(name)
- for cred in (cache.creds[u] for u in cache.cred_uuids):
- cred_cprinc, rest = extract_princ(cred)
- cred_sprinc, rest = extract_princ(rest)
- if princ_eq(cred_cprinc, cprinc) and princ_eq(cred_sprinc, sprinc):
- return 0, cred
- return KRB5Errors.KRB5_CC_NOTFOUND, b''
-
-
-def op_get_principal(argbytes):
- name, rest = unmarshal_name(argbytes)
- cache = get_cache(name)
- if cache.princ is None:
- return KRB5Errors.KRB5_FCC_NOFILE, b''
- return 0, cache.princ + b'\0'
-
-
-def op_get_cred_uuid_list(argbytes):
- name, rest = unmarshal_name(argbytes)
- cache = get_cache(name)
- return 0, b''.join(cache.cred_uuids)
-
-
-def op_get_cred_by_uuid(argbytes):
- name, uuid = unmarshal_name(argbytes)
- cache = get_cache(name)
- if uuid not in cache.creds:
- return KRB5Errors.KRB5_CC_END, b''
- return 0, cache.creds[uuid]
-
-
-def op_remove_cred(argbytes):
- return KRB5Errors.KRB5_CC_NOSUPP, b''
-
-
-def op_get_cache_uuid_list(argbytes):
- return 0, b''.join(cache_uuidmap.keys())
-
-
-def op_get_cache_by_uuid(argbytes):
- uuid = argbytes
- if uuid not in cache_uuidmap:
- return KRB5Errors.KRB5_CC_END, b''
- return 0, cache_uuidmap[uuid].name + b'\0'
-
-
-def op_get_default_cache(argbytes):
- return 0, defname + b'\0'
-
-
-def op_set_default_cache(argbytes):
- global defname
- defname, rest = unmarshal_name(argbytes)
- return 0, b''
-
-
-def op_get_kdc_offset(argbytes):
- name, rest = unmarshal_name(argbytes)
- cache = get_cache(name)
- return 0, struct.pack('>l', cache.time_offset)
-
-
-def op_set_kdc_offset(argbytes):
- name, obytes = unmarshal_name(argbytes)
- cache = get_cache(name)
- cache.time_offset, = struct.unpack('>l', obytes)
- return 0, b''
-
-
-def op_get_cred_list(argbytes):
- name, rest = unmarshal_name(argbytes)
- cache = get_cache(name)
- creds = [cache.creds[u] for u in cache.cred_uuids]
- return 0, (struct.pack('>L', len(creds)) +
- b''.join(struct.pack('>L', len(c)) + c for c in creds))
-
-
-def op_replace(argbytes):
- name, rest = unmarshal_name(argbytes)
- offset, = struct.unpack('>L', rest[0:4])
- princ, rest = extract_princ(rest[4:])
- ncreds, = struct.unpack('>L', rest[0:4])
- rest = rest[4:]
- creds = []
- for i in range(ncreds):
- len, = struct.unpack('>L', rest[0:4])
- creds.append(rest[4:4+len])
- rest = rest[4+len:]
-
- cache = get_cache(name)
- cache.princ = princ
- cache.cred_uuids = []
- cache.creds = {}
- cache.time_offset = offset
- for i in range(ncreds):
- uuid = make_uuid()
- cache.creds[uuid] = creds[i]
- cache.cred_uuids.append(uuid)
-
- return 0, b''
-
-
-ophandlers = {
- KCMOpcodes.GEN_NEW : op_gen_new,
- KCMOpcodes.INITIALIZE : op_initialize,
- KCMOpcodes.DESTROY : op_destroy,
- KCMOpcodes.STORE : op_store,
- KCMOpcodes.RETRIEVE : op_retrieve,
- KCMOpcodes.GET_PRINCIPAL : op_get_principal,
- KCMOpcodes.GET_CRED_UUID_LIST : op_get_cred_uuid_list,
- KCMOpcodes.GET_CRED_BY_UUID : op_get_cred_by_uuid,
- KCMOpcodes.REMOVE_CRED : op_remove_cred,
- KCMOpcodes.GET_CACHE_UUID_LIST : op_get_cache_uuid_list,
- KCMOpcodes.GET_CACHE_BY_UUID : op_get_cache_by_uuid,
- KCMOpcodes.GET_DEFAULT_CACHE : op_get_default_cache,
- KCMOpcodes.SET_DEFAULT_CACHE : op_set_default_cache,
- KCMOpcodes.GET_KDC_OFFSET : op_get_kdc_offset,
- KCMOpcodes.SET_KDC_OFFSET : op_set_kdc_offset,
- KCMOpcodes.GET_CRED_LIST : op_get_cred_list,
- KCMOpcodes.REPLACE : op_replace
-}
-
-# Read and respond to a request from the socket s.
-def service_request(s):
- lenbytes = b''
- while len(lenbytes) < 4:
- lenbytes += s.recv(4 - len(lenbytes))
- if lenbytes == b'':
- return False
-
- reqlen, = struct.unpack('>L', lenbytes)
- req = b''
- while len(req) < reqlen:
- req += s.recv(reqlen - len(req))
-
- majver, minver, op = struct.unpack('>BBH', req[:4])
- argbytes = req[4:]
-
- if op in ophandlers:
- code, payload = ophandlers[op](argbytes)
- else:
- code, payload = KRB5Errors.KRB5_FCC_INTERNAL, b''
-
- # The KCM response is the code (4 bytes) and the response payload.
- # The Heimdal IPC response is the length of the KCM response (4
- # bytes), a status code which is essentially always 0 (4 bytes),
- # and the KCM response.
- kcm_response = struct.pack('>l', code) + payload
- hipc_response = struct.pack('>LL', len(kcm_response), 0) + kcm_response
- s.sendall(hipc_response)
- return True
-
-parser = optparse.OptionParser()
-parser.add_option('-f', '--fallback', action='store_true', dest='fallback',
- default=False,
- help='Do not support RETRIEVE/GET_CRED_LIST/REPLACE')
-(options, args) = parser.parse_args()
-if options.fallback:
- del ophandlers[KCMOpcodes.RETRIEVE]
- del ophandlers[KCMOpcodes.GET_CRED_LIST]
- del ophandlers[KCMOpcodes.REPLACE]
-
-server = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
-server.bind(args[0])
-server.listen(5)
-select_input = [server,]
-sys.stderr.write('starting...\n')
-sys.stderr.flush()
-
-while True:
- iready, oready, xready = select.select(select_input, [], [])
- for s in iready:
- if s == server:
- client, addr = server.accept()
- select_input.append(client)
- else:
- if not service_request(s):
- select_input.remove(s)
- s.close()