diff --git a/.gitignore b/.gitignore index 13d1490..24d07bf 100644 --- a/.gitignore +++ b/.gitignore @@ -129,3 +129,6 @@ dmypy.json # Pyre type checker .pyre/ +test +.idea + diff --git a/Pipfile b/Pipfile new file mode 100644 index 0000000..59233f7 --- /dev/null +++ b/Pipfile @@ -0,0 +1,21 @@ +[[source]] +url = "https://pypi.douban.com/simple" +verify_ssl = false +name = "pypi" + +[packages] +fastapi = "*" +aioredis = "*" +kafka-python = "*" +uvicorn = "*" +requests = "*" +pipfile = "*" +gunicorn = "*" +uvloop = "*" +httptools = "*" +typing-extensions = "*" + +[dev-packages] + +[requires] +python_version = "3.8" diff --git a/Pipfile.lock b/Pipfile.lock new file mode 100644 index 0000000..a2106ee --- /dev/null +++ b/Pipfile.lock @@ -0,0 +1,264 @@ +{ + "_meta": { + "hash": { + "sha256": "9999b5b71da727c1482bc015f3bdf684ddc91cb7b48163e72da90d5c5b120726" + }, + "pipfile-spec": 6, + "requires": { + "python_version": "3.8" + }, + "sources": [ + { + "name": "pypi", + "url": "https://pypi.douban.com/simple", + "verify_ssl": false + } + ] + }, + "default": { + "aioredis": { + "hashes": [ + "sha256:15f8af30b044c771aee6787e5ec24694c048184c7b9e54c3b60c750a4b93273a", + "sha256:b61808d7e97b7cd5a92ed574937a079c9387fdadd22bfbfa7ad2fd319ecc26e3" + ], + "index": "pypi", + "version": "==1.3.1" + }, + "async-timeout": { + "hashes": [ + "sha256:0c3c816a028d47f659d6ff5c745cb2acf1f966da1fe5c19c77a70282b25f4c5f", + "sha256:4291ca197d287d274d0b6cb5d6f8f8f82d434ed288f962539ff18cc9012f9ea3" + ], + "markers": "python_full_version >= '3.5.3'", + "version": "==3.0.1" + }, + "certifi": { + "hashes": [ + "sha256:1a4995114262bffbc2413b159f2a1a480c969de6e6eb13ee966d470af86af59c", + "sha256:719a74fb9e33b9bd44cc7f3a8d94bc35e4049deebe19ba7d8e108280cfd59830" + ], + "version": "==2020.12.5" + }, + "chardet": { + "hashes": [ + "sha256:0d6f53a15db4120f2b08c94f11e7d93d2c911ee118b6b30a04ec3ee8310179fa", + "sha256:f864054d66fd9118f2e67044ac8981a54775ec5b67aed0441892edb553d21da5" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", + "version": "==4.0.0" + }, + "click": { + "hashes": [ + "sha256:d2b5255c7c6349bc1bd1e59e08cd12acbbd63ce649f2588755783aa94dfb6b1a", + "sha256:dacca89f4bfadd5de3d7489b7c8a566eee0d3676333fbb50030263894c38c0dc" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4'", + "version": "==7.1.2" + }, + "fastapi": { + "hashes": [ + "sha256:63c4592f5ef3edf30afa9a44fa7c6b7ccb20e0d3f68cd9eba07b44d552058dcb", + "sha256:98d8ea9591d8512fdadf255d2a8fa56515cdd8624dca4af369da73727409508e" + ], + "index": "pypi", + "version": "==0.63.0" + }, + "gunicorn": { + "hashes": [ + "sha256:e0a968b5ba15f8a328fdfd7ab1fcb5af4470c28aaf7e55df02a99bc13138e6e8" + ], + "index": "pypi", + "version": "==20.1.0" + }, + "h11": { + "hashes": [ + "sha256:36a3cb8c0a032f56e2da7084577878a035d3b61d104230d4bd49c0c6b555a9c6", + "sha256:47222cb6067e4a307d535814917cd98fd0a57b6788ce715755fa2b6c28b56042" + ], + "markers": "python_version >= '3.6'", + "version": "==0.12.0" + }, + "hiredis": { + "hashes": [ + "sha256:04026461eae67fdefa1949b7332e488224eac9e8f2b5c58c98b54d29af22093e", + "sha256:04927a4c651a0e9ec11c68e4427d917e44ff101f761cd3b5bc76f86aaa431d27", + "sha256:07bbf9bdcb82239f319b1f09e8ef4bdfaec50ed7d7ea51a56438f39193271163", + "sha256:09004096e953d7ebd508cded79f6b21e05dff5d7361771f59269425108e703bc", + "sha256:0adea425b764a08270820531ec2218d0508f8ae15a448568109ffcae050fee26", + "sha256:0b39ec237459922c6544d071cdcf92cbb5bc6685a30e7c6d985d8a3e3a75326e", + "sha256:0d5109337e1db373a892fdcf78eb145ffb6bbd66bb51989ec36117b9f7f9b579", + "sha256:0f41827028901814c709e744060843c77e78a3aca1e0d6875d2562372fcb405a", + "sha256:11d119507bb54e81f375e638225a2c057dda748f2b1deef05c2b1a5d42686048", + "sha256:1233e303645f468e399ec906b6b48ab7cd8391aae2d08daadbb5cad6ace4bd87", + "sha256:139705ce59d94eef2ceae9fd2ad58710b02aee91e7fa0ccb485665ca0ecbec63", + "sha256:1f03d4dadd595f7a69a75709bc81902673fa31964c75f93af74feac2f134cc54", + "sha256:240ce6dc19835971f38caf94b5738092cb1e641f8150a9ef9251b7825506cb05", + "sha256:294a6697dfa41a8cba4c365dd3715abc54d29a86a40ec6405d677ca853307cfb", + "sha256:3d55e36715ff06cdc0ab62f9591607c4324297b6b6ce5b58cb9928b3defe30ea", + "sha256:3dddf681284fe16d047d3ad37415b2e9ccdc6c8986c8062dbe51ab9a358b50a5", + "sha256:3f5f7e3a4ab824e3de1e1700f05ad76ee465f5f11f5db61c4b297ec29e692b2e", + "sha256:508999bec4422e646b05c95c598b64bdbef1edf0d2b715450a078ba21b385bcc", + "sha256:5d2a48c80cf5a338d58aae3c16872f4d452345e18350143b3bf7216d33ba7b99", + "sha256:5dc7a94bb11096bc4bffd41a3c4f2b958257085c01522aa81140c68b8bf1630a", + "sha256:65d653df249a2f95673976e4e9dd7ce10de61cfc6e64fa7eeaa6891a9559c581", + "sha256:7492af15f71f75ee93d2a618ca53fea8be85e7b625e323315169977fae752426", + "sha256:7f0055f1809b911ab347a25d786deff5e10e9cf083c3c3fd2dd04e8612e8d9db", + "sha256:807b3096205c7cec861c8803a6738e33ed86c9aae76cac0e19454245a6bbbc0a", + "sha256:81d6d8e39695f2c37954d1011c0480ef7cf444d4e3ae24bc5e89ee5de360139a", + "sha256:87c7c10d186f1743a8fd6a971ab6525d60abd5d5d200f31e073cd5e94d7e7a9d", + "sha256:8b42c0dc927b8d7c0eb59f97e6e34408e53bc489f9f90e66e568f329bff3e443", + "sha256:a00514362df15af041cc06e97aebabf2895e0a7c42c83c21894be12b84402d79", + "sha256:a39efc3ade8c1fb27c097fd112baf09d7fd70b8cb10ef1de4da6efbe066d381d", + "sha256:a4ee8000454ad4486fb9f28b0cab7fa1cd796fc36d639882d0b34109b5b3aec9", + "sha256:a7928283143a401e72a4fad43ecc85b35c27ae699cf5d54d39e1e72d97460e1d", + "sha256:adf4dd19d8875ac147bf926c727215a0faf21490b22c053db464e0bf0deb0485", + "sha256:ae8427a5e9062ba66fc2c62fb19a72276cf12c780e8db2b0956ea909c48acff5", + "sha256:b4c8b0bc5841e578d5fb32a16e0c305359b987b850a06964bd5a62739d688048", + "sha256:b84f29971f0ad4adaee391c6364e6f780d5aae7e9226d41964b26b49376071d0", + "sha256:c39c46d9e44447181cd502a35aad2bb178dbf1b1f86cf4db639d7b9614f837c6", + "sha256:cb2126603091902767d96bcb74093bd8b14982f41809f85c9b96e519c7e1dc41", + "sha256:dcef843f8de4e2ff5e35e96ec2a4abbdf403bd0f732ead127bd27e51f38ac298", + "sha256:e3447d9e074abf0e3cd85aef8131e01ab93f9f0e86654db7ac8a3f73c63706ce", + "sha256:f52010e0a44e3d8530437e7da38d11fb822acfb0d5b12e9cd5ba655509937ca0", + "sha256:f8196f739092a78e4f6b1b2172679ed3343c39c61a3e9d722ce6fcf1dac2824a" + ], + "markers": "python_version >= '3.6'", + "version": "==2.0.0" + }, + "httptools": { + "hashes": [ + "sha256:0a4b1b2012b28e68306575ad14ad5e9120b34fccd02a81eb08838d7e3bbb48be", + "sha256:3592e854424ec94bd17dc3e0c96a64e459ec4147e6d53c0a42d0ebcef9cb9c5d", + "sha256:41b573cf33f64a8f8f3400d0a7faf48e1888582b6f6e02b82b9bd4f0bf7497ce", + "sha256:56b6393c6ac7abe632f2294da53f30d279130a92e8ae39d8d14ee2e1b05ad1f2", + "sha256:86c6acd66765a934e8730bf0e9dfaac6fdcf2a4334212bd4a0a1c78f16475ca6", + "sha256:96da81e1992be8ac2fd5597bf0283d832287e20cb3cfde8996d2b00356d4e17f", + "sha256:96eb359252aeed57ea5c7b3d79839aaa0382c9d3149f7d24dd7172b1bcecb009", + "sha256:a2719e1d7a84bb131c4f1e0cb79705034b48de6ae486eb5297a139d6a3296dce", + "sha256:ac0aa11e99454b6a66989aa2d44bca41d4e0f968e395a0a8f164b401fefe359a", + "sha256:bc3114b9edbca5a1eb7ae7db698c669eb53eb8afbbebdde116c174925260849c", + "sha256:fa3cd71e31436911a44620473e873a256851e1f53dee56669dae403ba41756a4", + "sha256:fea04e126014169384dee76a153d4573d90d0cbd1d12185da089f73c78390437" + ], + "index": "pypi", + "version": "==0.1.1" + }, + "idna": { + "hashes": [ + "sha256:b307872f855b18632ce0c21c5e45be78c0ea7ae4c15c828c20788b26921eb3f6", + "sha256:b97d804b1e9b523befed77c48dacec60e6dcb0b5391d57af6a65a312a90648c0" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==2.10" + }, + "kafka-python": { + "hashes": [ + "sha256:04dfe7fea2b63726cd6f3e79a2d86e709d608d74406638c5da33a01d45a9d7e3", + "sha256:2d92418c7cb1c298fa6c7f0fb3519b520d0d7526ac6cb7ae2a4fc65a51a94b6e" + ], + "index": "pypi", + "version": "==2.0.2" + }, + "pipfile": { + "hashes": [ + "sha256:f7d9f15de8b660986557eb3cc5391aa1a16207ac41bc378d03f414762d36c984" + ], + "index": "pypi", + "version": "==0.0.2" + }, + "pydantic": { + "hashes": [ + "sha256:0c40162796fc8d0aa744875b60e4dc36834db9f2a25dbf9ba9664b1915a23850", + "sha256:20d42f1be7c7acc352b3d09b0cf505a9fab9deb93125061b376fbe1f06a5459f", + "sha256:2287ebff0018eec3cc69b1d09d4b7cebf277726fa1bd96b45806283c1d808683", + "sha256:258576f2d997ee4573469633592e8b99aa13bda182fcc28e875f866016c8e07e", + "sha256:26cf3cb2e68ec6c0cfcb6293e69fb3450c5fd1ace87f46b64f678b0d29eac4c3", + "sha256:2f2736d9a996b976cfdfe52455ad27462308c9d3d0ae21a2aa8b4cd1a78f47b9", + "sha256:3114d74329873af0a0e8004627f5389f3bb27f956b965ddd3e355fe984a1789c", + "sha256:3bbd023c981cbe26e6e21c8d2ce78485f85c2e77f7bab5ec15b7d2a1f491918f", + "sha256:3bcb9d7e1f9849a6bdbd027aabb3a06414abd6068cb3b21c49427956cce5038a", + "sha256:4bbc47cf7925c86a345d03b07086696ed916c7663cb76aa409edaa54546e53e2", + "sha256:6388ef4ef1435364c8cc9a8192238aed030595e873d8462447ccef2e17387125", + "sha256:830ef1a148012b640186bf4d9789a206c56071ff38f2460a32ae67ca21880eb8", + "sha256:8fbb677e4e89c8ab3d450df7b1d9caed23f254072e8597c33279460eeae59b99", + "sha256:c17a0b35c854049e67c68b48d55e026c84f35593c66d69b278b8b49e2484346f", + "sha256:dd4888b300769ecec194ca8f2699415f5f7760365ddbe243d4fd6581485fa5f0", + "sha256:dde4ca368e82791de97c2ec019681ffb437728090c0ff0c3852708cf923e0c7d", + "sha256:e3f8790c47ac42549dc8b045a67b0ca371c7f66e73040d0197ce6172b385e520", + "sha256:e8bc082afef97c5fd3903d05c6f7bb3a6af9fc18631b4cc9fedeb4720efb0c58", + "sha256:eb8ccf12295113ce0de38f80b25f736d62f0a8d87c6b88aca645f168f9c78771", + "sha256:fb77f7a7e111db1832ae3f8f44203691e15b1fa7e5a1cb9691d4e2659aee41c4", + "sha256:fbfb608febde1afd4743c6822c19060a8dbdd3eb30f98e36061ba4973308059e", + "sha256:fff29fe54ec419338c522b908154a2efabeee4f483e48990f87e189661f31ce3" + ], + "markers": "python_full_version >= '3.6.1'", + "version": "==1.8.1" + }, + "requests": { + "hashes": [ + "sha256:27973dd4a904a4f13b263a19c866c13b92a39ed1c964655f025f3f8d3d75b804", + "sha256:c210084e36a42ae6b9219e00e48287def368a26d03a048ddad7bfee44f75871e" + ], + "index": "pypi", + "version": "==2.25.1" + }, + "starlette": { + "hashes": [ + "sha256:bd2ffe5e37fb75d014728511f8e68ebf2c80b0fa3d04ca1479f4dc752ae31ac9", + "sha256:ebe8ee08d9be96a3c9f31b2cb2a24dbdf845247b745664bd8a3f9bd0c977fdbc" + ], + "markers": "python_version >= '3.6'", + "version": "==0.13.6" + }, + "toml": { + "hashes": [ + "sha256:806143ae5bfb6a3c6e736a764057db0e6a0e05e338b5630894a5f779cabb4f9b", + "sha256:b3bda1d108d5dd99f4a20d24d9c348e91c4db7ab1b749200bded2f839ccbe68f" + ], + "markers": "python_version >= '2.6' and python_version not in '3.0, 3.1, 3.2, 3.3'", + "version": "==0.10.2" + }, + "typing-extensions": { + "hashes": [ + "sha256:7cb407020f00f7bfc3cb3e7881628838e69d8f3fcab2f64742a5e76b2f841918", + "sha256:99d4073b617d30288f569d3f13d2bd7548c3a7e4c8de87db09a9d29bb3a4a60c", + "sha256:dafc7639cde7f1b6e1acc0f457842a83e722ccca8eef5270af2d74792619a89f" + ], + "index": "pypi", + "version": "==3.7.4.3" + }, + "urllib3": { + "hashes": [ + "sha256:2f4da4594db7e1e110a944bb1b551fdf4e6c136ad42e4234131391e21eb5b0df", + "sha256:e7b021f7241115872f92f43c6508082facffbd1c048e3c6e2bb9c2a157e28937" + ], + "markers": "python_version >= '2.7' and python_version not in '3.0, 3.1, 3.2, 3.3, 3.4' and python_version < '4'", + "version": "==1.26.4" + }, + "uvicorn": { + "hashes": [ + "sha256:3292251b3c7978e8e4a7868f4baf7f7f7bb7e40c759ecc125c37e99cdea34202", + "sha256:7587f7b08bd1efd2b9bad809a3d333e972f1d11af8a5e52a9371ee3a5de71524" + ], + "index": "pypi", + "version": "==0.13.4" + }, + "uvloop": { + "hashes": [ + "sha256:114543c84e95df1b4ff546e6e3a27521580466a30127f12172a3278172ad68bc", + "sha256:19fa1d56c91341318ac5d417e7b61c56e9a41183946cc70c411341173de02c69", + "sha256:2bb0624a8a70834e54dde8feed62ed63b50bad7a1265c40d6403a2ac447bce01", + "sha256:42eda9f525a208fbc4f7cecd00fa15c57cc57646c76632b3ba2fe005004f051d", + "sha256:44cac8575bf168601424302045234d74e3561fbdbac39b2b54cc1d1d00b70760", + "sha256:6de130d0cb78985a5d080e323b86c5ecaf3af82f4890492c05981707852f983c", + "sha256:7ae39b11a5f4cec1432d706c21ecc62f9e04d116883178b09671aa29c46f7a47", + "sha256:90e56f17755e41b425ad19a08c41dc358fa7bf1226c0f8e54d4d02d556f7af7c", + "sha256:b45218c99795803fb8bdbc9435ff7f54e3a591b44cd4c121b02fa83affb61c7c", + "sha256:e5e5f855c9bf483ee6cd1eb9a179b740de80cb0ae2988e3fa22309b78e2ea0e7" + ], + "index": "pypi", + "version": "==0.15.2" + } + }, + "develop": {} +} diff --git a/handler_data/__init__.py b/handler_data/__init__.py new file mode 100644 index 0000000..3909da1 --- /dev/null +++ b/handler_data/__init__.py @@ -0,0 +1,2 @@ +from .handler_user import HandlerUser +from .handler_event import HandlerEvent diff --git a/handler_data/handler_event.py b/handler_data/handler_event.py new file mode 100644 index 0000000..2dcae22 --- /dev/null +++ b/handler_data/handler_event.py @@ -0,0 +1,7 @@ +class HandlerEvent: + handler_link = [] + + def __init__(self, func): + HandlerEvent.handler_link.append(func) + + diff --git a/handler_data/handler_user.py b/handler_data/handler_user.py new file mode 100644 index 0000000..f558205 --- /dev/null +++ b/handler_data/handler_user.py @@ -0,0 +1,20 @@ +class HandlerUser: + handler_link = [] + + def __init__(self, func): + HandlerUser.handler_link.append(func) + + +@HandlerUser +async def device_label(rdb, item): + """ + 标记新设备 + :param rdb: + :param item: + :return: + """ + v = await rdb.execute('sadd', f'{item.game}.devices', item.properties.get('#device_id', '')) + if v: + item.properties['is_new_device'] = 1 + else: + item.properties['is_new_device'] = 0 \ No newline at end of file diff --git a/main.py b/main.py new file mode 100644 index 0000000..3c3b06c --- /dev/null +++ b/main.py @@ -0,0 +1,35 @@ +import uvicorn +from aioredis import create_redis_pool +from fastapi import FastAPI + +from routers import point +from settings import settings + +from utils.ta_sdk import TGAnalytics, ToKafka + +app = FastAPI() + + +def register_redis(app: FastAPI) -> None: + @app.on_event('startup') + async def startup_event(): + app.state.redis = await create_redis_pool(**settings.REDIS_CONF) + + @app.on_event('shutdown') + async def shutdown_event(): + app.state.redis.close() + await app.state.redis.wait_closed() + + +def register_ta(app: FastAPI) -> None: + @app.on_event('startup') + def startup_event(): + app.state.ta = TGAnalytics(ToKafka(settings.KAFKA_CONF)) + + +app.include_router(point.router, prefix='/v1') +register_redis(app) +register_ta(app) + +if __name__ == '__main__': + uvicorn.run(app='main:app', host="0.0.0.0", port=6666, reload=True, debug=True) diff --git a/routers/__init__.py b/routers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/routers/point.py b/routers/point.py new file mode 100644 index 0000000..53dbafa --- /dev/null +++ b/routers/point.py @@ -0,0 +1,57 @@ +import asyncio +import hashlib + +from fastapi import APIRouter, Request +from pydantic import BaseModel, validator + +from handler_data import HandlerUser, HandlerEvent +from settings import settings + +router = APIRouter() + + +class Item(BaseModel): + # sign = md5(distinct_id+account_id+act+ts+salt) + distinct_id: str + game: str + account_id: str + act: str + event_name: str = None + properties: dict + ts: int + sign: str + + @validator('sign') + def sign_validator(cls, v: str, values: dict): + s = f'{values.get("distinct_id")}{values.get("account_id", "")}{values.get("act", "")}{values.get("ts", "")}{settings.SALT}' + if hashlib.md5(s.encode()).hexdigest() == v: + return v + raise ValueError(f'打击违法犯罪行为{hashlib.md5(s.encode()).hexdigest()}') + + +@router.post("/point/") +async def point(request: Request, item: Item): + ta = getattr(request.app.state.ta, item.act) + # 将不同游戏发送到不同 topic_name + request.app.state.ta.consumer.topic_name = item.game + + rdb = request.app.state.redis + if ta and item.event_name and item.act == 'track': + await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerEvent.handler_link)) + await track(ta, item) + else: + await asyncio.gather(*map(lambda o: asyncio.create_task(o(rdb, item)), HandlerUser.handler_link)) + await user_set(ta, item) + results = {"code": 0, 'msg': 'ok'} + return results + + +async def track(ta, item): + ta(item.distinct_id, item.account_id, item.event_name, item.properties) + + +async def user_set(ta, item): + ta(item.distinct_id, item.account_id, item.properties) + + + diff --git a/settings.py b/settings.py new file mode 100644 index 0000000..674b684 --- /dev/null +++ b/settings.py @@ -0,0 +1,25 @@ +import json + + +class Config: + REDIS_CONF = { + 'address': ('192.168.0.161', 6379), + 'password': 'd1Gh*zp5', + 'db': 1 + } + SALT = '0r4X00mH' + KAFKA_CONF = { + 'bootstrap_servers': ["192.168.0.30:9092", "192.168.0.71:9092", "192.168.0.229:9092"], + 'value_serializer': lambda v: json.dumps(v).encode('utf-8'), + } + + +class Debug(Config): + REDIS_CONF = { + 'address': ('192.168.0.161', 6379), + 'password': 'd1Gh*zp5', + 'db': 0 + } + + +settings = Debug diff --git a/utils/__init__.py b/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/utils/ta_sdk.py b/utils/ta_sdk.py new file mode 100644 index 0000000..7f8bc16 --- /dev/null +++ b/utils/ta_sdk.py @@ -0,0 +1,930 @@ +# encoding:utf-8 + +from __future__ import unicode_literals + +import datetime +import gzip +import json +import os +import random +import re +import threading +import time +import uuid + +import requests +from kafka import KafkaProducer +from requests import ConnectionError + +try: + import queue + from urllib.parse import urlparse +except ImportError: + import Queue as queue + from urlparse import urlparse +try: + isinstance("", basestring) + + + def is_str(s): + return isinstance(s, basestring) +except NameError: + def is_str(s): + return isinstance(s, str) +try: + isinstance(1, long) + + + def is_int(n): + return isinstance(n, int) or isinstance(n, long) +except NameError: + def is_int(n): + return isinstance(n, int) + +try: + from enum import Enum + + ROTATE_MODE = Enum('ROTATE_MODE', ('DAILY', 'HOURLY')) +except ImportError: + class ROTATE_MODE(object): + DAILY = 0 + HOURLY = 1 + + +class TGAException(Exception): + pass + + +class TGAIllegalDataException(TGAException): + """数据格式异常 + + 在发送的数据格式有误时,SDK 会抛出此异常,用户应当捕获并处理. + """ + pass + + +class TGANetworkException(TGAException): + """网络异常 + + 在因为网络或者不可预知的问题导致数据无法发送时,SDK会抛出此异常,用户应当捕获并处理. + """ + pass + + +__version__ = '1.6.0' + + +class TGAnalytics(object): + """TGAnalytics 实例是发送事件数据和用户属性数据的关键实例 + """ + + __NAME_PATTERN = re.compile(r"^(#[a-z][a-z0-9_]{0,49})|([a-z][a-z0-9_]{0,50})$", re.I) + + def __init__(self, consumer, enable_uuid=False): + """创建一个 TGAnalytics 实例 + + TGAanlytics 需要与指定的 Consumer 一起使用,可以使用以下任何一种: + - LoggingConsumer: 批量实时写本地文件,并与 LogBus 搭配 + - BatchConsumer: 批量实时地向TA服务器传输数据(同步阻塞),不需要搭配传输工具 + - AsyncBatchConsumer: 批量实时地向TA服务器传输数据(异步非阻塞),不需要搭配传输工具 + - DebugConsumer: 逐条发送数据,并对数据格式做严格校验 + + Args: + consumer: 指定的 Consumer + """ + self.__consumer = consumer + self.__enableUuid = enable_uuid + self.__super_properties = {} + self.clear_super_properties() + + @property + def consumer(self): + """ + 用了更换 kafka topic_name + :return: + """ + return self.__consumer + + def user_set(self, distinct_id=None, account_id=None, properties=None): + """设置用户属性 + + 对于一般的用户属性,您可以调用 user_set 来进行设置。使用该接口上传的属性将会覆盖原有的属性值,如果之前不存在该用户属性, + 则会新建该用户属性,类型与传入属性的类型一致. + + Args: + distinct_id: 访客 ID + account_id: 账户 ID + properties: dict 类型的用户属性 + """ + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_set', properties_add=properties) + + def user_unset(self, distinct_id=None, account_id=None, properties=None): + """ + 删除某个用户的用户属性 + :param distinct_id: + :param account_id: + :param properties: + """ + if isinstance(properties, list): + properties = dict((key, 0) for key in properties) + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_unset', properties_add=properties) + + def user_setOnce(self, distinct_id=None, account_id=None, properties=None): + """设置用户属性, 不覆盖已存在的用户属性 + + 如果您要上传的用户属性只要设置一次,则可以调用 user_setOnce 来进行设置,当该属性之前已经有值的时候,将会忽略这条信息. + + Args: + distinct_id: 访客 ID + account_id: 账户 ID + properties: dict 类型的用户属性 + """ + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_setOnce', properties_add=properties) + + def user_add(self, distinct_id=None, account_id=None, properties=None): + """对指定的数值类型的用户属性进行累加操作 + + 当您要上传数值型的属性时,您可以调用 user_add 来对该属性进行累加操作. 如果该属性还未被设置,则会赋值0后再进行计算. + 可传入负值,等同于相减操作. + + Args: + distinct_id: 访客 ID + account_id: 账户 ID + properties: 数值类型的用户属性 + """ + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_add', properties_add=properties) + + def user_append(self, distinct_id=None, account_id=None, properties=None): + """追加一个用户的某一个或者多个集合类型 + Args: + distinct_id: 访客 ID + account_id: 账户 ID + properties: 集合 + """ + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_append', properties_add=properties) + + def user_del(self, distinct_id=None, account_id=None): + """删除用户 + + 如果您要删除某个用户,可以调用 user_del 将该用户删除。调用此函数后,将无法再查询该用户的用户属性, 但该用户产生的事件仍然可以被查询到. + + Args: + distinct_id: 访客 ID + account_id: 账户 ID + """ + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='user_del') + + def track(self, distinct_id=None, account_id=None, event_name=None, properties=None): + """发送事件数据 + + 您可以调用 track 来上传事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”, + 长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性. + + Args: + distinct_id: 访客 ID + account_id: 账户 ID + event_name: 事件名称 + properties: 事件属性 + + Raises: + TGAIllegalDataException: 数据格式错误时会抛出此异常 + """ + all_properties = self._public_track_add(event_name) + + if properties: + all_properties.update(properties) + + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track', event_name=event_name, + properties_add=all_properties) + + def track_update(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None): + """发送可更新的事件数据 + + 您可以调用 track_update 来上传可更新的事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”, + 长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性. + + Args: + distinct_id: 访客 ID + account_id: 账户 ID + event_name: 事件名称 + event_id: 事件唯一ID + properties: 事件属性 + + Raises: + TGAIllegalDataException: 数据格式错误时会抛出此异常 + """ + all_properties = self._public_track_add(event_name) + + if properties: + all_properties.update(properties) + + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_update', event_name=event_name, + event_id=event_id, properties_add=all_properties) + + def track_overwrite(self, distinct_id=None, account_id=None, event_name=None, event_id=None, properties=None): + """发送可覆盖的事件数据 + + 您可以调用 track_overwrite 来上传可全部覆盖的事件,建议您根据先前梳理的文档来设置事件的属性以及发送信息的条件. 事件的名称只能以字母开头,可包含数字,字母和下划线“_”, + 长度最大为 50 个字符,对字母大小写不敏感. 事件的属性是一个 dict 对象,其中每个元素代表一个属性. + + Args: + distinct_id: 访客 ID + account_id: 账户 ID + event_name: 事件名称 + event_id: 事件唯一ID + properties: 事件属性 + + Raises: + TGAIllegalDataException: 数据格式错误时会抛出此异常 + """ + all_properties = self._public_track_add(event_name) + + if properties: + all_properties.update(properties) + + self.__add(distinct_id=distinct_id, account_id=account_id, send_type='track_overwrite', event_name=event_name, + event_id=event_id, properties_add=all_properties) + + def flush(self): + """立即提交数据到相应的接收端 + """ + self.__consumer.flush() + + def close(self): + """关闭并退出 sdk + + 请在退出前调用本接口,以避免缓存内的数据丢失 + """ + self.__consumer.close() + + def _public_track_add(self, event_name): + if not is_str(event_name): + raise TGAIllegalDataException('a string type event_name is required for track') + + all_properties = { + '#lib': 'tga_python_sdk', + '#lib_version': __version__, + } + all_properties.update(self.__super_properties) + return all_properties + pass + + def __add(self, distinct_id, account_id, send_type, event_name=None, event_id=None, properties_add=None): + if distinct_id is None and account_id is None: + raise TGAException("Distinct_id and account_id must be set at least one") + + if properties_add: + properties = properties_add.copy() + else: + properties = {} + data = { + '#type': send_type + } + if "#ip" in properties.keys(): + data['#ip'] = properties.get("#ip") + del (properties['#ip']) + if "#first_check_id" in properties.keys(): + data['#first_check_id'] = properties.get("#first_check_id") + del (properties['#first_check_id']) + # 只支持UUID标准格式xxxxxxxx - xxxx - xxxx - xxxx - xxxxxxxxxxxx + if "#uuid" in properties.keys(): + data['#uuid'] = str(properties['#uuid']) + del (properties['#uuid']) + elif self.__enableUuid: + data['#uuid'] = str(uuid.uuid1()) + if "#app_id" in properties.keys(): + data['#app_id'] = properties.get("#app_id") + del (properties['#app_id']) + + self.__assert_properties(send_type, properties) + td_time = properties.get("#time") + data['#time'] = td_time + del (properties['#time']) + + data['properties'] = properties + + if event_name is not None: + data['#event_name'] = event_name + if event_id is not None: + data['#event_id'] = event_id + if distinct_id is not None: + data['#distinct_id'] = distinct_id + if account_id is not None: + data['#account_id'] = account_id + + self.__consumer.add(json.dumps(data)) + + def __assert_properties(self, action_type, properties): + if properties is not None: + if "#time" not in properties.keys(): + properties['#time'] = datetime.datetime.now() + else: + try: + time_temp = properties.get('#time') + if isinstance(time_temp, datetime.datetime) or isinstance(time_temp, datetime.date): + pass + else: + raise TGAIllegalDataException('Value of #time should be datetime.datetime or datetime.date') + except Exception as e: + raise TGAIllegalDataException(e) + + for key, value in properties.items(): + if not is_str(key): + raise TGAIllegalDataException("Property key must be a str. [key=%s]" % str(key)) + + if value is None: + continue + + if not self.__NAME_PATTERN.match(key): + raise TGAIllegalDataException( + "type[%s] property key must be a valid variable name. [key=%s]" % (action_type, str(key))) + + if not is_str(value) and not is_int(value) and not isinstance(value, float) \ + and not isinstance(value, bool) \ + and not isinstance(value, datetime.datetime) and not isinstance(value, datetime.date) \ + and not isinstance(value, list): + raise TGAIllegalDataException( + "property value must be a str/int/float/bool/datetime/date/list. [value=%s]" % type(value)) + + if 'user_add' == action_type.lower() and not self.__number(value) and not key.startswith('#'): + raise TGAIllegalDataException('user_add properties must be number type') + + if isinstance(value, datetime.datetime): + properties[key] = value.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + elif isinstance(value, datetime.date): + properties[key] = value.strftime('%Y-%m-%d') + if isinstance(value, list): + i = 0 + for lvalue in value: + if isinstance(lvalue, datetime.datetime): + value[i] = lvalue.strftime('%Y-%m-%d %H:%M:%S.%f')[:-3] + i += 1 + + def __number(self, s): + if is_int(s): + return True + if isinstance(s, float): + return True + return False + + def clear_super_properties(self): + """删除所有已设置的事件公共属性 + """ + self.__super_properties = { + '#lib': 'tga_python_sdk', + '#lib_version': __version__, + } + + def set_super_properties(self, super_properties): + """设置公共事件属性 + + 公共事件属性是所有事件中的属性属性,建议您在发送事件前,先设置公共事件属性. 当 track 的 properties 和 + super properties 有相同的 key 时,track 的 properties 会覆盖公共事件属性的值. + + Args: + super_properties 公共属性 + """ + self.__super_properties.update(super_properties) + + +if os.name == 'nt': + import msvcrt + + + def _lock(file_): + try: + save_pos = file_.tell() + file_.seek(0) + try: + msvcrt.locking(file_.fileno(), msvcrt.LK_LOCK, 1) + except IOError as e: + raise TGAException(e) + finally: + if save_pos: + file_.seek(save_pos) + except IOError as e: + raise TGAException(e) + + + def _unlock(file_): + try: + save_pos = file_.tell() + if save_pos: + file_.seek(0) + try: + msvcrt.locking(file_.fileno(), msvcrt.LK_UNLCK, 1) + except IOError as e: + raise TGAException(e) + finally: + if save_pos: + file_.seek(save_pos) + except IOError as e: + raise TGAException(e) +elif os.name == 'posix': + import fcntl + + + def _lock(file_): + try: + fcntl.flock(file_.fileno(), fcntl.LOCK_EX) + except IOError as e: + raise TGAException(e) + + + def _unlock(file_): + fcntl.flock(file_.fileno(), fcntl.LOCK_UN) +else: + raise TGAException("Python SDK is defined for NT and POSIX system.") + + +class _TAFileLock(object): + def __init__(self, file_handler): + self._file_handler = file_handler + + def __enter__(self): + _lock(self._file_handler) + return self + + def __exit__(self, t, v, tb): + _unlock(self._file_handler) + + +class LoggingConsumer(object): + """数据批量实时写入本地文件 + + 创建指定文件存放目录的 LoggingConsumer, 将数据使用 logging 库输出到指定路径. 同时,需将 LogBus 的监听文件夹地址 + 设置为此处的地址,即可使用LogBus进行数据的监听上传. + """ + _mutex = queue.Queue() + _mutex.put(1) + + class _FileWriter(object): + _writers = {} + _writeMutex = queue.Queue() + _writeMutex.put(1) + + @classmethod + def instance(cls, filename): + cls._writeMutex.get(block=True, timeout=None) + try: + if filename in cls._writers.keys(): + result = cls._writers[filename] + result._count = result._count + 1 + else: + result = cls(filename) + cls._writers[filename] = result + return result + finally: + cls._writeMutex.put(1) + + def __init__(self, filename): + self._filename = filename + self._file = open(self._filename, 'a') + self._count = 1 + + def close(self): + LoggingConsumer._FileWriter._writeMutex.get(block=True, timeout=None) + try: + self._count = self._count - 1 + if self._count == 0: + self._file.close() + del LoggingConsumer._FileWriter._writers[self._filename] + finally: + LoggingConsumer._FileWriter._writeMutex.put(1) + + def is_valid(self, filename): + return self._filename == filename + + def write(self, messages): + with _TAFileLock(self._file): + for message in messages: + self._file.write(message) + self._file.write('\n') + self._file.flush() + + @classmethod + def construct_filename(cls, directory, date_suffix, file_size, file_prefix): + filename = file_prefix + ".log." + date_suffix \ + if file_prefix is not None else "log." + date_suffix + + if file_size > 0: + count = 0 + file_path = directory + filename + "_" + str(count) + while os.path.exists(file_path) and cls.file_size_out(file_path, file_size): + count = count + 1 + file_path = directory + filename + "_" + str(count) + return file_path + else: + return directory + filename + + @classmethod + def file_size_out(cls, file_path, file_size): + fsize = os.path.getsize(file_path) + fsize = fsize / float(1024 * 1024) + if fsize >= file_size: + return True + return False + + @classmethod + def unlock_logging_consumer(cls): + cls._mutex.put(1) + + @classmethod + def lock_logging_consumer(cls): + cls._mutex.get(block=True, timeout=None) + + def __init__(self, log_directory, log_size=0, buffer_size=8192, rotate_mode=ROTATE_MODE.DAILY, file_prefix=None): + """创建指定日志文件目录的 LoggingConsumer + + Args: + log_directory: 日志保存目录 + log_size: 单个日志文件的大小, 单位 MB, log_size <= 0 表示不限制单个文件大小 + buffer_size: 每次写入文件的大小, 单位 Byte, 默认 8K + rotate_mode: 日志切分模式,默认按天切分 + """ + if not os.path.exists(log_directory): + os.makedirs(log_directory) + self.log_directory = log_directory # log文件保存的目录 + self.sdf = '%Y-%m-%d-%H' if rotate_mode == ROTATE_MODE.HOURLY else '%Y-%m-%d' + self.suffix = datetime.datetime.now().strftime(self.sdf) + self._fileSize = log_size # 单个log文件的大小 + if not self.log_directory.endswith("/"): + self.log_directory = self.log_directory + "/" + + self._buffer = [] + self._buffer_size = buffer_size + self._file_prefix = file_prefix + self.lock_logging_consumer() + filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize, + self._file_prefix) + self._writer = LoggingConsumer._FileWriter.instance(filename) + self.unlock_logging_consumer() + + def add(self, msg): + messages = None + self.lock_logging_consumer() + self._buffer.append(msg) + if len(self._buffer) > self._buffer_size: + messages = self._buffer + date_suffix = datetime.datetime.now().strftime(self.sdf) + if self.suffix != date_suffix: + self.suffix = date_suffix + + filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize, + self._file_prefix) + if not self._writer.is_valid(filename): + self._writer.close() + self._writer = LoggingConsumer._FileWriter.instance(filename) + self._buffer = [] + if messages: + self._writer.write(messages) + self.unlock_logging_consumer() + + def flush_with_close(self, is_close): + messages = None + self.lock_logging_consumer() + if len(self._buffer) > 0: + messages = self._buffer + filename = LoggingConsumer.construct_filename(self.log_directory, self.suffix, self._fileSize, + self._file_prefix) + if not self._writer.is_valid(filename): + self._writer.close() + self._writer = LoggingConsumer._FileWriter.instance(filename) + self._buffer = [] + if messages: + self._writer.write(messages) + if is_close: + self._writer.close() + self.unlock_logging_consumer() + + def flush(self): + self.flush_with_close(False) + + def close(self): + self.flush_with_close(True) + + +class BatchConsumer(object): + """同步、批量地向 TA 服务器传输数据 + + 通过指定接收端地址和 APP ID,可以同步的向 TA 服务器传输数据. 此 Consumer 不需要搭配传输工具, + 但是存在网络不稳定等原因造成数据丢失的可能,因此不建议在生产环境中使用. + + 触发上报的时机为以下条件满足其中之一的时候: + 1. 数据条数大于预定义的最大值, 默认为 20 条 + 2. 数据发送间隔超过预定义的最大时间, 默认为 3 秒 + """ + _batchlock = threading.RLock() + _cachelock = threading.RLock() + + def __init__(self, server_uri, appid, batch=20, timeout=30000, interval=3, compress=True, maxCacheSize=50): + """创建 BatchConsumer + + Args: + server_uri: 服务器的 URL 地址 + appid: 项目的 APP ID + batch: 指定触发上传的数据条数, 默认为 20 条, 最大 200 条 + timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms + interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3 秒 + """ + self.__interval = interval + self.__batch = min(batch, 200) + self.__message_channel = [] + self.__maxCacheSize = maxCacheSize + self.__cache_buffer = [] + self.__last_flush = time.time() + server_url = urlparse(server_uri) + self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, timeout) + self.__http_service.compress = compress + + def add(self, msg): + self._batchlock.acquire() + try: + self.__message_channel.append(msg) + finally: + self._batchlock.release() + if len(self.__message_channel) >= self.__batch \ + or len(self.__cache_buffer) > 0: + self.flush_once() + + def flush(self, throw_exception=True): + while len(self.__cache_buffer) > 0 or len(self.__message_channel) > 0: + try: + self.flush_once(throw_exception) + except TGAIllegalDataException: + continue + + def flush_once(self, throw_exception=True): + if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0: + return + + self._cachelock.acquire() + self._batchlock.acquire() + try: + try: + if len(self.__message_channel) == 0 and len(self.__cache_buffer) == 0: + return + if len(self.__cache_buffer) == 0 or len(self.__message_channel) >= self.__batch: + self.__cache_buffer.append(self.__message_channel) + self.__message_channel = [] + finally: + self._batchlock.release() + msg = self.__cache_buffer[0] + self.__http_service.send('[' + ','.join(msg) + ']', str(len(msg))) + self.__last_flush = time.time() + self.__cache_buffer = self.__cache_buffer[1:] + except TGANetworkException as e: + if throw_exception: + raise e + except TGAIllegalDataException as e: + self.__cache_buffer = self.__cache_buffer[1:] + if throw_exception: + raise e + finally: + self._cachelock.release() + + def close(self): + self.flush() + + pass + + +class AsyncBatchConsumer(object): + """异步、批量地向 TA 服务器发送数据的 + + AsyncBatchConsumer 使用独立的线程进行数据发送,当满足以下两个条件之一时触发数据上报: + 1. 数据条数大于预定义的最大值, 默认为 20 条 + 2. 数据发送间隔超过预定义的最大时间, 默认为 3 秒 + """ + + def __init__(self, server_uri, appid, interval=3, flush_size=20, queue_size=100000): + """创建 AsyncBatchConsumer + + Args: + server_uri: 服务器的 URL 地址 + appid: 项目的 APP ID + interval: 推送数据的最大时间间隔, 单位为秒, 默认为 3 秒 + flush_size: 队列缓存的阈值,超过此值将立即进行发送 + queue_size: 缓存队列的大小 + """ + server_url = urlparse(server_uri) + self.__http_service = _HttpServices(server_url._replace(path='/sync_server').geturl(), appid, 30000) + self.__batch = flush_size + self.__queue = queue.Queue(queue_size) + + # 初始化发送线程 + self.__flushing_thread = self._AsyncFlushThread(self, interval) + self.__flushing_thread.daemon = True + self.__flushing_thread.start() + + def add(self, msg): + try: + self.__queue.put_nowait(msg) + except queue.Full as e: + raise TGANetworkException(e) + + if self.__queue.qsize() > self.__batch: + self.flush() + + def flush(self): + self.__flushing_thread.flush() + + def close(self): + self.__flushing_thread.stop() + while not self.__queue.empty(): + self._perform_request() + + def _perform_request(self): + """同步的发送数据 + + 仅用于内部调用, 用户不应当调用此方法. + """ + flush_buffer = [] + while len(flush_buffer) < self.__batch: + try: + flush_buffer.append(str(self.__queue.get_nowait())) + except queue.Empty: + break + + if len(flush_buffer) > 0: + for i in range(3): # 网络异常情况下重试 3 次 + try: + self.__http_service.send('[' + ','.join(flush_buffer) + ']', str(len(flush_buffer))) + return True + except TGANetworkException: + pass + except TGAIllegalDataException: + break + + class _AsyncFlushThread(threading.Thread): + def __init__(self, consumer, interval): + threading.Thread.__init__(self) + self._consumer = consumer + self._interval = interval + + self._stop_event = threading.Event() + self._finished_event = threading.Event() + self._flush_event = threading.Event() + + def flush(self): + self._flush_event.set() + + def stop(self): + """停止线程 + + 退出时需调用此方法,以保证线程安全结束. + """ + self._stop_event.set() + self._finished_event.wait() + + def run(self): + while True: + # 如果 _flush_event 标志位为 True,或者等待超过 _interval 则继续执行 + self._flush_event.wait(self._interval) + self._consumer._perform_request() + self._flush_event.clear() + + # 发现 stop 标志位时安全退出 + if self._stop_event.isSet(): + break + self._finished_event.set() + + +def _gzip_string(data): + try: + return gzip.compress(data) + except AttributeError: + import StringIO + buf = StringIO.StringIO() + fd = gzip.GzipFile(fileobj=buf, mode="w") + fd.write(data) + fd.close() + return buf.getvalue() + + +class _HttpServices(object): + """内部类,用于发送网络请求 + + 指定接收端地址和项目 APP ID, 实现向接收端上传数据的接口. 发送前将数据默认使用 Gzip 压缩, + """ + + def __init__(self, server_uri, appid, timeout=30000): + self.url = server_uri + self.appid = appid + self.timeout = timeout + self.compress = True + + def send(self, data, length): + """使用 Requests 发送数据给服务器 + + Args: + data: 待发送的数据 + length + + Raises: + TGAIllegalDataException: 数据错误 + TGANetworkException: 网络错误 + """ + headers = {'appid': self.appid, 'TA-Integration-Type': 'python-sdk', 'TA-Integration-Version': __version__, + 'TA-Integration-Count': length} + try: + compress_type = 'gzip' + if self.compress: + data = _gzip_string(data.encode("utf-8")) + else: + compress_type = 'none' + data = data.encode("utf-8") + headers['compress'] = compress_type + response = requests.post(self.url, data=data, headers=headers, timeout=self.timeout) + if response.status_code == 200: + responseData = json.loads(response.text) + if responseData["code"] == 0: + return True + else: + raise TGAIllegalDataException("Unexpected result code: " + str(responseData["code"])) + else: + raise TGANetworkException("Unexpected Http status code " + str(response.status_code)) + except ConnectionError as e: + time.sleep(0.5) + raise TGANetworkException("Data transmission failed due to " + repr(e)) + + +class DebugConsumer(object): + """逐条、同步的发送数据给接收服务器 + + 服务端会对数据进行严格校验,当某个属性不符合规范时,整条数据都不会入库. 当数据格式错误时抛出包含详细原因的异常信息. + 建议首先使用此 Consumer 来调试埋点数据. + """ + + def __init__(self, server_uri, appid, timeout=30000, write_data=True): + """创建 DebugConsumer + + Args: + server_uri: 服务器的 URL 地址 + appid: 项目的 APP ID + timeout: 请求的超时时间, 单位毫秒, 默认为 30000 ms + """ + server_url = urlparse(server_uri) + debug_url = server_url._replace(path='/data_debug') + self.__server_uri = debug_url.geturl() + self.__appid = appid + self.__timeout = timeout + self.__writer_data = write_data + + def add(self, msg): + try: + dry_run = 0 + if not self.__writer_data: + dry_run = 1 + response = requests.post(self.__server_uri, + data={'source': 'server', 'appid': self.__appid, 'data': msg, 'dryRun': dry_run}, + timeout=self.__timeout) + if response.status_code == 200: + responseData = json.loads(response.text) + if responseData["errorLevel"] == 0: + return True + else: + print("Unexpected result : \n %s" % response.text) + else: + raise TGANetworkException("Unexpected http status code: " + str(response.status_code)) + except ConnectionError as e: + time.sleep(0.5) + raise TGANetworkException("Data transmission failed due to " + repr(e)) + + def flush(self, throw_exception=True): + pass + + def close(self): + pass + + +class ToKafka(object): + """ + 将数据发送到kafka + 注意 减少不必要的查询 分区固定设置16个 + """ + + def __init__(self, conf): + self.__topic_name = None + self.__producer = KafkaProducer(**conf) + + @property + def topic_name(self): + return self.__topic_name + + @topic_name.setter + def topic_name(self, topic_name): + self.__topic_name = topic_name + # self.__producer.partitions_for(topic_name) + + def add(self, msg): + try: + self.__producer.send(self.__topic_name, msg, partition=random.randint(0, 15)) + except Exception as e: + print(e) + + def flush(self, throw_exception=True): + pass + + def close(self): + pass